diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java index fdb5b52797..eed944f29b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java @@ -258,7 +258,7 @@ public final class JsonUtil { JsonObjectBuilder jsonObjectBuilder = JsonLoader.createObjectBuilder(); if (map != null) { for (Map.Entry entry : map.entrySet()) { - addToObject(entry.getKey(), entry.getValue(), jsonObjectBuilder); + addToObject(String.valueOf(entry.getKey()), entry.getValue(), jsonObjectBuilder); } } return jsonObjectBuilder.build(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index c2d7ba6a2a..d213446c01 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -297,13 +297,13 @@ public interface QueueControl { long countMessages() throws Exception; /** - * Counts the number of messages in this queue group by property received in the parameter using JSON serialization. - * In case no has property return amount of messages in the *_UNDEFINED_* + * Counts the number of messages in this queue matching the specified filter, grouped by the given property field. + * In case of null property will be grouped in "null" *
* Using {@code null} or an empty filter will count all messages from this queue. */ - @Operation(desc = "Returns the number of the messages in the separate property value the queue", impact = MBeanOperationInfo.INFO) - String countMessagesProperty(@Parameter(name = "filter", desc = "This filter separate account messages") String filter) throws Exception; + @Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field", impact = MBeanOperationInfo.INFO) + String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception; /** * Removes the message corresponding to the specified message ID. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index ed467793dc..7377846208 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -63,7 +63,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator; public class QueueControlImpl extends AbstractControl implements QueueControl { public static final int FLUSH_LIMIT = 500; - public static final String UNDEFINED = "*_UNDEFINED_*"; + // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- @@ -683,56 +683,46 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public long countMessages(final String filterStr) throws Exception { - checkStarted(); + Long value = intenalCountMessages(filterStr, null).get(null); + return value == null ? 0 : value; + } + @Override + public String countMessages(final String filterStr, final String groupByProperty) throws Exception { + return JsonUtil.toJsonObject(intenalCountMessages(filterStr, groupByProperty)).toString(); + } + + private Map intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception { + checkStarted(); clearIO(); + Map result = new HashMap<>(); + try { Filter filter = FilterImpl.createFilter(filterStr); - if (filter == null) { - return getMessageCount(); + SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr); + if (filter == null && groupByProperty == null) { + result.put(null, getMessageCount()); } else { try (LinkedListIterator iterator = queue.browserIterator()) { - int count = 0; - try { while (iterator.hasNext()) { - MessageReference ref = iterator.next(); - if (filter.match(ref.getMessage())) { - count++; + Message message = iterator.next().getMessage(); + if (filter == null || filter.match(message)) { + if (groupByProperty == null) { + result.compute(null, (k,v) -> v == null ? 1 : ++v); + } else { + Object value = message.getObjectProperty(groupByProperty); + String valueStr = value == null ? null : value.toString(); + result.compute(valueStr, (k,v) -> v == null ? 1 : ++v); + } } } } catch (NoSuchElementException ignored) { // this could happen through paging browsing } - return count; } } - } finally { - blockOnIO(); - } - } - - @Override - public String countMessagesProperty(final String filter) throws Exception { - checkStarted(); - clearIO(); - try { - try (LinkedListIterator iterator = queue.browserIterator()) { - Map result = new HashMap<>(); - String propertySearch = filter == null ? UNDEFINED : filter; - try { - while (iterator.hasNext()) { - MessageReference ref = iterator.next(); - String messageProperty = ref.getMessage().getStringProperty(propertySearch); - messageProperty = messageProperty == null ? UNDEFINED : messageProperty; - Integer value = result.getOrDefault(messageProperty, 0); - result.put(messageProperty, ++value); - } - } catch (NoSuchElementException ignored) { - // this could happen through paging browsing - } - return JsonUtil.toJsonObject(result).toString(); - } + return result; } finally { blockOnIO(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index c3406672db..5ba0b39ba9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -1795,7 +1795,7 @@ public class QueueControlTest extends ManagementTestBase { for (int i = 0; i < 100; i++) { ClientMessage msg = session.createMessage(false); - if(i % 3 == 0){ + if (i % 3 == 0) { msg.putStringProperty(key, valueGroup1); } else { msg.putStringProperty(key, valueGroup2); @@ -1810,11 +1810,11 @@ public class QueueControlTest extends ManagementTestBase { QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(120, getMessageCount(queueControl)); - String result = queueControl.countMessagesProperty(key); + String result = queueControl.countMessages(null, key); JsonObject jsonObject = JsonUtil.readJsonObject(result); Assert.assertEquals(34, jsonObject.getInt(valueGroup1)); Assert.assertEquals(66, jsonObject.getInt(valueGroup2)); - Assert.assertEquals(20, jsonObject.getInt("*_UNDEFINED_*")); + Assert.assertEquals(20, jsonObject.getInt("null")); session.deleteQueue(queue); } @@ -1834,9 +1834,9 @@ public class QueueControlTest extends ManagementTestBase { QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(100, getMessageCount(queueControl)); - String result = queueControl.countMessagesProperty(null); + String result = queueControl.countMessages(null,null); JsonObject jsonObject = JsonUtil.readJsonObject(result); - Assert.assertEquals(100, jsonObject.getInt("*_UNDEFINED_*")); + Assert.assertEquals(100, jsonObject.getInt("null")); session.deleteQueue(queue); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index b0efb93f38..0a62334faf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -102,8 +102,8 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } @Override - public String countMessagesProperty(final String filter) throws Exception { - return (String) proxy.invokeOperation(String.class, "countMessagesProperty", filter); + public String countMessages(final String filter, final String groupByFilter) throws Exception { + return (String) proxy.invokeOperation(String.class, "countMessages", filter, groupByFilter); } @Override