diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index b210530f1c..b57be21e6c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -311,6 +311,23 @@ public interface QueueControl {
@Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field", impact = MBeanOperationInfo.INFO)
String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
+ /**
+ * Counts the number of delivering messages in this queue matching the specified filter.
+ *
+ * Using {@code null} or an empty filter will count all messages from this queue.
+ */
+ @Operation(desc = "Returns the number of the messages in the queue matching the given filter")
+ long countDeliveringMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
+
+ /**
+ * Counts the number of delivering messages in this queue matching the specified filter, grouped by the given property field.
+ * In case of null property will be grouped in "null"
+ *
+ * Using {@code null} or an empty filter will count all messages from this queue.
+ */
+ @Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field")
+ String countDeliveringMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
+
/**
* Removes the message corresponding to the specified message ID.
*
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 3db5caef58..d62978f2e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -706,9 +706,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
private Map intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();
- clearIO();
- Map result = new HashMap<>();
+ clearIO();
+
+ Map result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
@@ -719,15 +720,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
while (iterator.hasNext()) {
Message message = iterator.next().getMessage();
- if (filter == null || filter.match(message)) {
- if (groupByProperty == null) {
- result.compute(null, (k,v) -> v == null ? 1 : ++v);
- } else {
- Object value = message.getObjectProperty(groupByProperty);
- String valueStr = value == null ? null : value.toString();
- result.compute(valueStr, (k,v) -> v == null ? 1 : ++v);
- }
- }
+ internalComputeMessage(result, filter, groupByProperty, message);
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
@@ -740,6 +733,54 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
+ @Override
+ public long countDeliveringMessages(final String filterStr) throws Exception {
+ Long value = intenalCountDeliveryMessages(filterStr, null).get(null);
+ return value == null ? 0 : value;
+ }
+
+ @Override
+ public String countDeliveringMessages(final String filterStr, final String groupByProperty) throws Exception {
+ return JsonUtil.toJsonObject(intenalCountDeliveryMessages(filterStr, groupByProperty)).toString();
+ }
+
+ private Map intenalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
+ checkStarted();
+
+ clearIO();
+
+ Map result = new HashMap<>();
+ try {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+ if (filter == null && groupByProperty == null) {
+ result.put(null, Long.valueOf(getDeliveringCount()));
+ } else {
+ Map> deliveringMessages = queue.getDeliveringMessages();
+ deliveringMessages.forEach((s, messageReferenceList) ->
+ messageReferenceList.forEach(messageReference ->
+ internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
+ ));
+ }
+ return result;
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ private void internalComputeMessage(Map result, Filter filter, SimpleString groupByProperty, Message message) {
+ if (filter == null || filter.match(message)) {
+ if (groupByProperty == null) {
+ result.compute(null, (k, v) -> v == null ? 1 : ++v);
+ } else {
+ Object value = message.getObjectProperty(groupByProperty);
+ String valueStr = value == null ? null : value.toString();
+ result.compute(valueStr, (k, v) -> v == null ? 1 : ++v);
+ }
+ }
+ }
+
+
@Override
public boolean removeMessage(final long messageID) throws Exception {
checkStarted();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 5ba0b39ba9..d64eba69b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1703,6 +1703,189 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
+ @Test
+ public void testCountDeliveringMessageCountWithFilter() throws Exception {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = RandomUtil.randomPositiveLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage matchingMessage = session.createMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + matchingValue));
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
+ Assert.assertEquals(2, queueControl.countDeliveringMessages(key + " =" + matchingValue));
+ Assert.assertEquals(1, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountNoFilter() throws Exception {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = RandomUtil.randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage matchingMessage = session.createMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountNoGroupNoFilter() throws Exception {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = RandomUtil.randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage matchingMessage = session.createMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ String result = queueControl.countDeliveringMessages(null, null);
+ JsonObject jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(0, jsonObject.getInt("null"));
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+
+ result = queueControl.countDeliveringMessages(null, null);
+ jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(3, jsonObject.getInt("null"));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountGroupNoFilter() throws Exception {
+ String key = new String("key_group");
+ String valueGroup1 = "group_1";
+ String valueGroup2 = "group_2";
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage message1 = session.createMessage(false);
+ message1.putStringProperty(key, valueGroup1);
+ ClientMessage message2 = session.createMessage(false);
+ message2.putStringProperty(key, valueGroup2);
+ producer.send(message1);
+ producer.send(message2);
+ producer.send(message1);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ String result = queueControl.countDeliveringMessages(null, key);
+ JsonObject jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertTrue(jsonObject.isEmpty());
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+
+ result = queueControl.countDeliveringMessages(null, key);
+ jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(2, jsonObject.getInt(valueGroup1));
+ Assert.assertEquals(1, jsonObject.getInt(valueGroup2));
+ Assert.assertFalse(jsonObject.containsKey(null));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountGroupFilter() throws Exception {
+ String key = new String("key_group");
+ long valueGroup1 = RandomUtil.randomLong();
+ long valueGroup2 = valueGroup1 + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage message1 = session.createMessage(false);
+ message1.putLongProperty(key, valueGroup1);
+ ClientMessage message2 = session.createMessage(false);
+ message2.putLongProperty(key, valueGroup2);
+ producer.send(message1);
+ producer.send(message2);
+ producer.send(message1);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ String result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
+ JsonObject jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertTrue(jsonObject.isEmpty());
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+
+ result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
+ jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(2, jsonObject.getInt(String.valueOf(valueGroup1)));
+ Assert.assertFalse(jsonObject.containsKey(String.valueOf(valueGroup2)));
+ Assert.assertFalse(jsonObject.containsKey(null));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
@Test
public void testCountMessagesWithFilter() throws Exception {
SimpleString key = new SimpleString("key");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 26ada034f9..1f2032aacb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -106,6 +106,16 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (String) proxy.invokeOperation(String.class, "countMessages", filter, groupByFilter);
}
+ @Override
+ public long countDeliveringMessages(final String filter) throws Exception {
+ return (Long) proxy.invokeOperation(Long.class, "countDeliveringMessages", filter);
+ }
+
+ @Override
+ public String countDeliveringMessages(final String filter, final String groupByFilter) throws Exception {
+ return (String) proxy.invokeOperation(String.class, "countDeliveringProperty", filter, groupByFilter);
+ }
+
@Override
public boolean expireMessage(final long messageID) throws Exception {
return (Boolean) proxy.invokeOperation("expireMessage", messageID);