ARTEMIS-2022 - Enhancements
Fix checkstyle Avoid duplicated logic Ability to filter and group Instantiate SimpleString property key once Get property value via getObjectProprty to ensure all special mapped properties such as in AMQPMessage would return Avoid a custom string to represent null, instead rely on Java's representation "null" by using Objects.toString to get the string value of the property value used to group by.
This commit is contained in:
parent
349477ed31
commit
24a28da09f
|
@ -258,7 +258,7 @@ public final class JsonUtil {
|
||||||
JsonObjectBuilder jsonObjectBuilder = JsonLoader.createObjectBuilder();
|
JsonObjectBuilder jsonObjectBuilder = JsonLoader.createObjectBuilder();
|
||||||
if (map != null) {
|
if (map != null) {
|
||||||
for (Map.Entry<String, ?> entry : map.entrySet()) {
|
for (Map.Entry<String, ?> entry : map.entrySet()) {
|
||||||
addToObject(entry.getKey(), entry.getValue(), jsonObjectBuilder);
|
addToObject(String.valueOf(entry.getKey()), entry.getValue(), jsonObjectBuilder);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return jsonObjectBuilder.build();
|
return jsonObjectBuilder.build();
|
||||||
|
|
|
@ -297,13 +297,13 @@ public interface QueueControl {
|
||||||
long countMessages() throws Exception;
|
long countMessages() throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Counts the number of messages in this queue group by property received in the parameter using JSON serialization.
|
* Counts the number of messages in this queue matching the specified filter, grouped by the given property field.
|
||||||
* In case no has property return amount of messages in the *_UNDEFINED_*
|
* In case of null property will be grouped in "null"
|
||||||
* <br>
|
* <br>
|
||||||
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
|
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
|
||||||
*/
|
*/
|
||||||
@Operation(desc = "Returns the number of the messages in the separate property value the queue", impact = MBeanOperationInfo.INFO)
|
@Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field", impact = MBeanOperationInfo.INFO)
|
||||||
String countMessagesProperty(@Parameter(name = "filter", desc = "This filter separate account messages") String filter) throws Exception;
|
String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes the message corresponding to the specified message ID.
|
* Removes the message corresponding to the specified message ID.
|
||||||
|
|
|
@ -63,7 +63,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
public class QueueControlImpl extends AbstractControl implements QueueControl {
|
public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
|
|
||||||
public static final int FLUSH_LIMIT = 500;
|
public static final int FLUSH_LIMIT = 500;
|
||||||
public static final String UNDEFINED = "*_UNDEFINED_*";
|
|
||||||
// Constants -----------------------------------------------------
|
// Constants -----------------------------------------------------
|
||||||
|
|
||||||
// Attributes ----------------------------------------------------
|
// Attributes ----------------------------------------------------
|
||||||
|
@ -683,56 +683,46 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long countMessages(final String filterStr) throws Exception {
|
public long countMessages(final String filterStr) throws Exception {
|
||||||
checkStarted();
|
Long value = intenalCountMessages(filterStr, null).get(null);
|
||||||
|
return value == null ? 0 : value;
|
||||||
clearIO();
|
|
||||||
try {
|
|
||||||
Filter filter = FilterImpl.createFilter(filterStr);
|
|
||||||
if (filter == null) {
|
|
||||||
return getMessageCount();
|
|
||||||
} else {
|
|
||||||
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
|
||||||
int count = 0;
|
|
||||||
|
|
||||||
try {
|
|
||||||
while (iterator.hasNext()) {
|
|
||||||
MessageReference ref = iterator.next();
|
|
||||||
if (filter.match(ref.getMessage())) {
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (NoSuchElementException ignored) {
|
|
||||||
// this could happen through paging browsing
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
blockOnIO();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String countMessagesProperty(final String filter) throws Exception {
|
public String countMessages(final String filterStr, final String groupByProperty) throws Exception {
|
||||||
|
return JsonUtil.toJsonObject(intenalCountMessages(filterStr, groupByProperty)).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
clearIO();
|
clearIO();
|
||||||
|
Map<String, Long> result = new HashMap<>();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
Filter filter = FilterImpl.createFilter(filterStr);
|
||||||
|
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
|
||||||
|
if (filter == null && groupByProperty == null) {
|
||||||
|
result.put(null, getMessageCount());
|
||||||
|
} else {
|
||||||
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
||||||
Map<String, Integer> result = new HashMap<>();
|
|
||||||
String propertySearch = filter == null ? UNDEFINED : filter;
|
|
||||||
try {
|
try {
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
MessageReference ref = iterator.next();
|
Message message = iterator.next().getMessage();
|
||||||
String messageProperty = ref.getMessage().getStringProperty(propertySearch);
|
if (filter == null || filter.match(message)) {
|
||||||
messageProperty = messageProperty == null ? UNDEFINED : messageProperty;
|
if (groupByProperty == null) {
|
||||||
Integer value = result.getOrDefault(messageProperty, 0);
|
result.compute(null, (k,v) -> v == null ? 1 : ++v);
|
||||||
result.put(messageProperty, ++value);
|
} else {
|
||||||
|
Object value = message.getObjectProperty(groupByProperty);
|
||||||
|
String valueStr = value == null ? null : value.toString();
|
||||||
|
result.compute(valueStr, (k,v) -> v == null ? 1 : ++v);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (NoSuchElementException ignored) {
|
} catch (NoSuchElementException ignored) {
|
||||||
// this could happen through paging browsing
|
// this could happen through paging browsing
|
||||||
}
|
}
|
||||||
return JsonUtil.toJsonObject(result).toString();
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
} finally {
|
} finally {
|
||||||
blockOnIO();
|
blockOnIO();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1810,11 +1810,11 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
QueueControl queueControl = createManagementControl(address, queue);
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
Assert.assertEquals(120, getMessageCount(queueControl));
|
Assert.assertEquals(120, getMessageCount(queueControl));
|
||||||
String result = queueControl.countMessagesProperty(key);
|
String result = queueControl.countMessages(null, key);
|
||||||
JsonObject jsonObject = JsonUtil.readJsonObject(result);
|
JsonObject jsonObject = JsonUtil.readJsonObject(result);
|
||||||
Assert.assertEquals(34, jsonObject.getInt(valueGroup1));
|
Assert.assertEquals(34, jsonObject.getInt(valueGroup1));
|
||||||
Assert.assertEquals(66, jsonObject.getInt(valueGroup2));
|
Assert.assertEquals(66, jsonObject.getInt(valueGroup2));
|
||||||
Assert.assertEquals(20, jsonObject.getInt("*_UNDEFINED_*"));
|
Assert.assertEquals(20, jsonObject.getInt("null"));
|
||||||
session.deleteQueue(queue);
|
session.deleteQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1834,9 +1834,9 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
QueueControl queueControl = createManagementControl(address, queue);
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
Assert.assertEquals(100, getMessageCount(queueControl));
|
Assert.assertEquals(100, getMessageCount(queueControl));
|
||||||
String result = queueControl.countMessagesProperty(null);
|
String result = queueControl.countMessages(null,null);
|
||||||
JsonObject jsonObject = JsonUtil.readJsonObject(result);
|
JsonObject jsonObject = JsonUtil.readJsonObject(result);
|
||||||
Assert.assertEquals(100, jsonObject.getInt("*_UNDEFINED_*"));
|
Assert.assertEquals(100, jsonObject.getInt("null"));
|
||||||
session.deleteQueue(queue);
|
session.deleteQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -102,8 +102,8 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String countMessagesProperty(final String filter) throws Exception {
|
public String countMessages(final String filter, final String groupByFilter) throws Exception {
|
||||||
return (String) proxy.invokeOperation(String.class, "countMessagesProperty", filter);
|
return (String) proxy.invokeOperation(String.class, "countMessages", filter, groupByFilter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue