ARTEMIS-2150 Counts the number of delivering messages in this queue

This commit is contained in:
Arthur Fritz Santiago 2018-10-26 15:32:34 -03:00 committed by Michael Andre Pearce
parent 3f3046c5ef
commit dba405c404
4 changed files with 262 additions and 11 deletions

View File

@ -311,6 +311,23 @@ public interface QueueControl {
@Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field", impact = MBeanOperationInfo.INFO)
String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
/**
* Counts the number of delivering messages in this queue matching the specified filter.
* <br>
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
*/
@Operation(desc = "Returns the number of the messages in the queue matching the given filter")
long countDeliveringMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
/**
* Counts the number of delivering messages in this queue matching the specified filter, grouped by the given property field.
* In case of null property will be grouped in "null"
* <br>
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
*/
@Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field")
String countDeliveringMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
/**
* Removes the message corresponding to the specified message ID.
*

View File

@ -706,9 +706,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();
clearIO();
Map<String, Long> result = new HashMap<>();
clearIO();
Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
@ -719,15 +720,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
while (iterator.hasNext()) {
Message message = iterator.next().getMessage();
if (filter == null || filter.match(message)) {
if (groupByProperty == null) {
result.compute(null, (k,v) -> v == null ? 1 : ++v);
} else {
Object value = message.getObjectProperty(groupByProperty);
String valueStr = value == null ? null : value.toString();
result.compute(valueStr, (k,v) -> v == null ? 1 : ++v);
}
}
internalComputeMessage(result, filter, groupByProperty, message);
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
@ -740,6 +733,54 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public long countDeliveringMessages(final String filterStr) throws Exception {
Long value = intenalCountDeliveryMessages(filterStr, null).get(null);
return value == null ? 0 : value;
}
@Override
public String countDeliveringMessages(final String filterStr, final String groupByProperty) throws Exception {
return JsonUtil.toJsonObject(intenalCountDeliveryMessages(filterStr, groupByProperty)).toString();
}
private Map<String, Long> intenalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();
clearIO();
Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
if (filter == null && groupByProperty == null) {
result.put(null, Long.valueOf(getDeliveringCount()));
} else {
Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
deliveringMessages.forEach((s, messageReferenceList) ->
messageReferenceList.forEach(messageReference ->
internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
));
}
return result;
} finally {
blockOnIO();
}
}
private void internalComputeMessage(Map<String, Long> result, Filter filter, SimpleString groupByProperty, Message message) {
if (filter == null || filter.match(message)) {
if (groupByProperty == null) {
result.compute(null, (k, v) -> v == null ? 1 : ++v);
} else {
Object value = message.getObjectProperty(groupByProperty);
String valueStr = value == null ? null : value.toString();
result.compute(valueStr, (k, v) -> v == null ? 1 : ++v);
}
}
}
@Override
public boolean removeMessage(final long messageID) throws Exception {
checkStarted();

View File

@ -1703,6 +1703,189 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testCountDeliveringMessageCountWithFilter() throws Exception {
SimpleString key = new SimpleString("key");
long matchingValue = RandomUtil.randomPositiveLong();
long unmatchingValue = matchingValue + 1;
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
ClientMessage matchingMessage = session.createMessage(false);
matchingMessage.putLongProperty(key, matchingValue);
ClientMessage unmatchingMessage = session.createMessage(false);
unmatchingMessage.putLongProperty(key, unmatchingValue);
producer.send(matchingMessage);
producer.send(unmatchingMessage);
producer.send(matchingMessage);
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + matchingValue));
Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
ClientMessage message = consumer.receive(500);
Assert.assertNotNull(message);
Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
Assert.assertEquals(2, queueControl.countDeliveringMessages(key + " =" + matchingValue));
Assert.assertEquals(1, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testCountDeliveringMessageCountNoFilter() throws Exception {
SimpleString key = new SimpleString("key");
long matchingValue = RandomUtil.randomLong();
long unmatchingValue = matchingValue + 1;
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
ClientMessage matchingMessage = session.createMessage(false);
matchingMessage.putLongProperty(key, matchingValue);
ClientMessage unmatchingMessage = session.createMessage(false);
unmatchingMessage.putLongProperty(key, unmatchingValue);
producer.send(matchingMessage);
producer.send(unmatchingMessage);
producer.send(matchingMessage);
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
ClientMessage message = consumer.receive(500);
Assert.assertNotNull(message);
Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testCountDeliveringMessageCountNoGroupNoFilter() throws Exception {
SimpleString key = new SimpleString("key");
long matchingValue = RandomUtil.randomLong();
long unmatchingValue = matchingValue + 1;
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
ClientMessage matchingMessage = session.createMessage(false);
matchingMessage.putLongProperty(key, matchingValue);
ClientMessage unmatchingMessage = session.createMessage(false);
unmatchingMessage.putLongProperty(key, unmatchingValue);
producer.send(matchingMessage);
producer.send(unmatchingMessage);
producer.send(matchingMessage);
QueueControl queueControl = createManagementControl(address, queue);
String result = queueControl.countDeliveringMessages(null, null);
JsonObject jsonObject = JsonUtil.readJsonObject(result);
Assert.assertEquals(0, jsonObject.getInt("null"));
ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
ClientMessage message = consumer.receive(500);
Assert.assertNotNull(message);
result = queueControl.countDeliveringMessages(null, null);
jsonObject = JsonUtil.readJsonObject(result);
Assert.assertEquals(3, jsonObject.getInt("null"));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testCountDeliveringMessageCountGroupNoFilter() throws Exception {
String key = new String("key_group");
String valueGroup1 = "group_1";
String valueGroup2 = "group_2";
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
ClientMessage message1 = session.createMessage(false);
message1.putStringProperty(key, valueGroup1);
ClientMessage message2 = session.createMessage(false);
message2.putStringProperty(key, valueGroup2);
producer.send(message1);
producer.send(message2);
producer.send(message1);
QueueControl queueControl = createManagementControl(address, queue);
String result = queueControl.countDeliveringMessages(null, key);
JsonObject jsonObject = JsonUtil.readJsonObject(result);
Assert.assertTrue(jsonObject.isEmpty());
ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
ClientMessage message = consumer.receive(500);
Assert.assertNotNull(message);
result = queueControl.countDeliveringMessages(null, key);
jsonObject = JsonUtil.readJsonObject(result);
Assert.assertEquals(2, jsonObject.getInt(valueGroup1));
Assert.assertEquals(1, jsonObject.getInt(valueGroup2));
Assert.assertFalse(jsonObject.containsKey(null));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testCountDeliveringMessageCountGroupFilter() throws Exception {
String key = new String("key_group");
long valueGroup1 = RandomUtil.randomLong();
long valueGroup2 = valueGroup1 + 1;
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
ClientMessage message1 = session.createMessage(false);
message1.putLongProperty(key, valueGroup1);
ClientMessage message2 = session.createMessage(false);
message2.putLongProperty(key, valueGroup2);
producer.send(message1);
producer.send(message2);
producer.send(message1);
QueueControl queueControl = createManagementControl(address, queue);
String result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
JsonObject jsonObject = JsonUtil.readJsonObject(result);
Assert.assertTrue(jsonObject.isEmpty());
ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
ClientMessage message = consumer.receive(500);
Assert.assertNotNull(message);
result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
jsonObject = JsonUtil.readJsonObject(result);
Assert.assertEquals(2, jsonObject.getInt(String.valueOf(valueGroup1)));
Assert.assertFalse(jsonObject.containsKey(String.valueOf(valueGroup2)));
Assert.assertFalse(jsonObject.containsKey(null));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testCountMessagesWithFilter() throws Exception {
SimpleString key = new SimpleString("key");

View File

@ -106,6 +106,16 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (String) proxy.invokeOperation(String.class, "countMessages", filter, groupByFilter);
}
@Override
public long countDeliveringMessages(final String filter) throws Exception {
return (Long) proxy.invokeOperation(Long.class, "countDeliveringMessages", filter);
}
@Override
public String countDeliveringMessages(final String filter, final String groupByFilter) throws Exception {
return (String) proxy.invokeOperation(String.class, "countDeliveringProperty", filter, groupByFilter);
}
@Override
public boolean expireMessage(final long messageID) throws Exception {
return (Boolean) proxy.invokeOperation("expireMessage", messageID);