[ARTEMIS-2022] Create count messages 'group by' this property filter
This commit is contained in:
parent
2ec8927ae7
commit
e15917129f
|
@ -296,6 +296,15 @@ public interface QueueControl {
|
|||
@Operation(desc = "Returns the number of the messages in the queue", impact = MBeanOperationInfo.INFO)
|
||||
long countMessages() throws Exception;
|
||||
|
||||
/**
|
||||
* Counts the number of messages in this queue group by property received in the parameter using JSON serialization.
|
||||
* In case no has property return amount of messages in the *_UNDEFINED_*
|
||||
* <br>
|
||||
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
|
||||
*/
|
||||
@Operation(desc = "Returns the number of the messages in the separate property value the queue", impact = MBeanOperationInfo.INFO)
|
||||
String countMessagesProperty(@Parameter(name = "filter", desc = "This filter separate account messages") String filter) throws Exception;
|
||||
|
||||
/**
|
||||
* Removes the message corresponding to the specified message ID.
|
||||
*
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
|||
public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||
|
||||
public static final int FLUSH_LIMIT = 500;
|
||||
public static final String UNDEFINED = "*_UNDEFINED_*";
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
@ -711,6 +712,32 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String countMessagesProperty(final String filter) throws Exception {
|
||||
checkStarted();
|
||||
clearIO();
|
||||
try {
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
||||
Map<String, Integer> result = new HashMap<>();
|
||||
String propertySearch = filter == null ? UNDEFINED : filter;
|
||||
try {
|
||||
while (iterator.hasNext()) {
|
||||
MessageReference ref = iterator.next();
|
||||
String messageProperty = ref.getMessage().getStringProperty(propertySearch);
|
||||
messageProperty = messageProperty == null ? UNDEFINED : messageProperty ;
|
||||
Integer value = result.getOrDefault(messageProperty, 0);
|
||||
result.put(messageProperty, ++value);
|
||||
}
|
||||
} catch (NoSuchElementException ignored) {
|
||||
// this could happen through paging browsing
|
||||
}
|
||||
return JsonUtil.toJsonObject(result).toString();
|
||||
}
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeMessage(final long messageID) throws Exception {
|
||||
checkStarted();
|
||||
|
|
|
@ -1781,6 +1781,66 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountMessagesPropertyExist() throws Exception {
|
||||
String key = new String("key_group");
|
||||
String valueGroup1 = "group_1";
|
||||
String valueGroup2 = "group_2";
|
||||
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
SimpleString queue = RandomUtil.randomSimpleString();
|
||||
|
||||
session.createQueue(address, queue, null, false);
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
ClientMessage msg = session.createMessage(false);
|
||||
if(i % 3 == 0){
|
||||
msg.putStringProperty(key, valueGroup1);
|
||||
} else {
|
||||
msg.putStringProperty(key, valueGroup2);
|
||||
}
|
||||
producer.send(msg);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
ClientMessage msg = session.createMessage(false);
|
||||
producer.send(msg);
|
||||
}
|
||||
|
||||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
Assert.assertEquals(120, getMessageCount(queueControl));
|
||||
String result = queueControl.countMessagesProperty(key);
|
||||
JsonObject jsonObject = JsonUtil.readJsonObject(result);
|
||||
Assert.assertEquals(34, jsonObject.getInt(valueGroup1));
|
||||
Assert.assertEquals(66, jsonObject.getInt(valueGroup2));
|
||||
Assert.assertEquals(20, jsonObject.getInt("*_UNDEFINED_*"));
|
||||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountMessagesPropertyWithNullFilter() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
SimpleString queue = RandomUtil.randomSimpleString();
|
||||
|
||||
session.createQueue(address, queue, null, false);
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
ClientMessage msg = session.createMessage(false);
|
||||
msg.putStringProperty(RandomUtil.randomString(), RandomUtil.randomString());
|
||||
producer.send(msg);
|
||||
}
|
||||
|
||||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
Assert.assertEquals(100, getMessageCount(queueControl));
|
||||
String result = queueControl.countMessagesProperty(null);
|
||||
JsonObject jsonObject = JsonUtil.readJsonObject(result);
|
||||
Assert.assertEquals(100, jsonObject.getInt("*_UNDEFINED_*"));
|
||||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testExpireMessagesWithFilter() throws Exception {
|
||||
SimpleString key = new SimpleString("key");
|
||||
|
|
|
@ -101,6 +101,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
|||
return (Long) proxy.invokeOperation(Long.class, "countMessages");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String countMessagesProperty(final String filter) throws Exception {
|
||||
return (String) proxy.invokeOperation(String.class, "countMessagesProperty", filter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean expireMessage(final long messageID) throws Exception {
|
||||
return (Boolean) proxy.invokeOperation("expireMessage", messageID);
|
||||
|
|
Loading…
Reference in New Issue