This commit is contained in:
Michael Andre Pearce 2018-08-13 08:37:56 +01:00
commit 6f0988dc55
5 changed files with 101 additions and 10 deletions

View File

@ -258,7 +258,7 @@ public final class JsonUtil {
JsonObjectBuilder jsonObjectBuilder = JsonLoader.createObjectBuilder();
if (map != null) {
for (Map.Entry<String, ?> entry : map.entrySet()) {
addToObject(entry.getKey(), entry.getValue(), jsonObjectBuilder);
addToObject(String.valueOf(entry.getKey()), entry.getValue(), jsonObjectBuilder);
}
}
return jsonObjectBuilder.build();

View File

@ -296,6 +296,15 @@ public interface QueueControl {
@Operation(desc = "Returns the number of the messages in the queue", impact = MBeanOperationInfo.INFO)
long countMessages() throws Exception;
/**
* Counts the number of messages in this queue matching the specified filter, grouped by the given property field.
* In case of null property will be grouped in "null"
* <br>
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
*/
@Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field", impact = MBeanOperationInfo.INFO)
String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
/**
* Removes the message corresponding to the specified message ID.
*

View File

@ -63,6 +63,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public class QueueControlImpl extends AbstractControl implements QueueControl {
public static final int FLUSH_LIMIT = 500;
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@ -682,30 +683,46 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public long countMessages(final String filterStr) throws Exception {
checkStarted();
Long value = intenalCountMessages(filterStr, null).get(null);
return value == null ? 0 : value;
}
@Override
public String countMessages(final String filterStr, final String groupByProperty) throws Exception {
return JsonUtil.toJsonObject(intenalCountMessages(filterStr, groupByProperty)).toString();
}
private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();
clearIO();
Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
if (filter == null) {
return getMessageCount();
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
if (filter == null && groupByProperty == null) {
result.put(null, getMessageCount());
} else {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
int count = 0;
try {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
if (filter.match(ref.getMessage())) {
count++;
Message message = iterator.next().getMessage();
if (filter == null || filter.match(message)) {
if (groupByProperty == null) {
result.compute(null, (k,v) -> v == null ? 1 : ++v);
} else {
Object value = message.getObjectProperty(groupByProperty);
String valueStr = value == null ? null : value.toString();
result.compute(valueStr, (k,v) -> v == null ? 1 : ++v);
}
}
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
return count;
}
}
return result;
} finally {
blockOnIO();
}

View File

@ -1781,6 +1781,66 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testCountMessagesPropertyExist() throws Exception {
String key = new String("key_group");
String valueGroup1 = "group_1";
String valueGroup2 = "group_2";
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < 100; i++) {
ClientMessage msg = session.createMessage(false);
if (i % 3 == 0) {
msg.putStringProperty(key, valueGroup1);
} else {
msg.putStringProperty(key, valueGroup2);
}
producer.send(msg);
}
for (int i = 0; i < 20; i++) {
ClientMessage msg = session.createMessage(false);
producer.send(msg);
}
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(120, getMessageCount(queueControl));
String result = queueControl.countMessages(null, key);
JsonObject jsonObject = JsonUtil.readJsonObject(result);
Assert.assertEquals(34, jsonObject.getInt(valueGroup1));
Assert.assertEquals(66, jsonObject.getInt(valueGroup2));
Assert.assertEquals(20, jsonObject.getInt("null"));
session.deleteQueue(queue);
}
@Test
public void testCountMessagesPropertyWithNullFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < 100; i++) {
ClientMessage msg = session.createMessage(false);
msg.putStringProperty(RandomUtil.randomString(), RandomUtil.randomString());
producer.send(msg);
}
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(100, getMessageCount(queueControl));
String result = queueControl.countMessages(null,null);
JsonObject jsonObject = JsonUtil.readJsonObject(result);
Assert.assertEquals(100, jsonObject.getInt("null"));
session.deleteQueue(queue);
}
@Test
public void testExpireMessagesWithFilter() throws Exception {
SimpleString key = new SimpleString("key");

View File

@ -101,6 +101,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Long) proxy.invokeOperation(Long.class, "countMessages");
}
@Override
public String countMessages(final String filter, final String groupByFilter) throws Exception {
return (String) proxy.invokeOperation(String.class, "countMessages", filter, groupByFilter);
}
@Override
public boolean expireMessage(final long messageID) throws Exception {
return (Boolean) proxy.invokeOperation("expireMessage", messageID);