From e15917129f92ecd1ee3e503d5896668b4743345d Mon Sep 17 00:00:00 2001 From: Arthur Fritz Santiago Date: Fri, 10 Aug 2018 11:49:16 -0300 Subject: [PATCH] [ARTEMIS-2022] Create count messages 'group by' this property filter --- .../api/core/management/QueueControl.java | 9 +++ .../management/impl/QueueControlImpl.java | 27 +++++++++ .../management/QueueControlTest.java | 60 +++++++++++++++++++ .../management/QueueControlUsingCoreTest.java | 5 ++ 4 files changed, 101 insertions(+) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 2aafcb144e..c2d7ba6a2a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -296,6 +296,15 @@ public interface QueueControl { @Operation(desc = "Returns the number of the messages in the queue", impact = MBeanOperationInfo.INFO) long countMessages() throws Exception; + /** + * Counts the number of messages in this queue group by property received in the parameter using JSON serialization. + * In case no has property return amount of messages in the *_UNDEFINED_* + *
+ * Using {@code null} or an empty filter will count all messages from this queue. + */ + @Operation(desc = "Returns the number of the messages in the separate property value the queue", impact = MBeanOperationInfo.INFO) + String countMessagesProperty(@Parameter(name = "filter", desc = "This filter separate account messages") String filter) throws Exception; + /** * Removes the message corresponding to the specified message ID. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 16e948339e..24afd9fa0c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator; public class QueueControlImpl extends AbstractControl implements QueueControl { public static final int FLUSH_LIMIT = 500; + public static final String UNDEFINED = "*_UNDEFINED_*"; // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- @@ -711,6 +712,32 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public String countMessagesProperty(final String filter) throws Exception { + checkStarted(); + clearIO(); + try { + try (LinkedListIterator iterator = queue.browserIterator()) { + Map result = new HashMap<>(); + String propertySearch = filter == null ? UNDEFINED : filter; + try { + while (iterator.hasNext()) { + MessageReference ref = iterator.next(); + String messageProperty = ref.getMessage().getStringProperty(propertySearch); + messageProperty = messageProperty == null ? UNDEFINED : messageProperty ; + Integer value = result.getOrDefault(messageProperty, 0); + result.put(messageProperty, ++value); + } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing + } + return JsonUtil.toJsonObject(result).toString(); + } + } finally { + blockOnIO(); + } + } + @Override public boolean removeMessage(final long messageID) throws Exception { checkStarted(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 9a68badad3..c3406672db 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -1781,6 +1781,66 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testCountMessagesPropertyExist() throws Exception { + String key = new String("key_group"); + String valueGroup1 = "group_1"; + String valueGroup2 = "group_2"; + + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + ClientProducer producer = session.createProducer(address); + + for (int i = 0; i < 100; i++) { + ClientMessage msg = session.createMessage(false); + if(i % 3 == 0){ + msg.putStringProperty(key, valueGroup1); + } else { + msg.putStringProperty(key, valueGroup2); + } + producer.send(msg); + } + + for (int i = 0; i < 20; i++) { + ClientMessage msg = session.createMessage(false); + producer.send(msg); + } + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(120, getMessageCount(queueControl)); + String result = queueControl.countMessagesProperty(key); + JsonObject jsonObject = JsonUtil.readJsonObject(result); + Assert.assertEquals(34, jsonObject.getInt(valueGroup1)); + Assert.assertEquals(66, jsonObject.getInt(valueGroup2)); + Assert.assertEquals(20, jsonObject.getInt("*_UNDEFINED_*")); + session.deleteQueue(queue); + } + + @Test + public void testCountMessagesPropertyWithNullFilter() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + ClientProducer producer = session.createProducer(address); + + for (int i = 0; i < 100; i++) { + ClientMessage msg = session.createMessage(false); + msg.putStringProperty(RandomUtil.randomString(), RandomUtil.randomString()); + producer.send(msg); + } + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(100, getMessageCount(queueControl)); + String result = queueControl.countMessagesProperty(null); + JsonObject jsonObject = JsonUtil.readJsonObject(result); + Assert.assertEquals(100, jsonObject.getInt("*_UNDEFINED_*")); + session.deleteQueue(queue); + } + + @Test public void testExpireMessagesWithFilter() throws Exception { SimpleString key = new SimpleString("key"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index dcdb2f108f..b0efb93f38 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -101,6 +101,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Long) proxy.invokeOperation(Long.class, "countMessages"); } + @Override + public String countMessagesProperty(final String filter) throws Exception { + return (String) proxy.invokeOperation(String.class, "countMessagesProperty", filter); + } + @Override public boolean expireMessage(final long messageID) throws Exception { return (Boolean) proxy.invokeOperation("expireMessage", messageID);