This commit is contained in:
Clebert Suconic 2021-03-10 08:51:06 -05:00
commit 4f5821d11f
3 changed files with 44 additions and 6 deletions

View File

@ -847,9 +847,11 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
int count = 0;
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext()) {
while (iterator.hasNext() && count++ < limit) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
Message message = ref.getMessage();
@ -983,9 +985,11 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (filter == null && groupByProperty == null) {
result.put(null, getMessageCount());
} else {
final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
int count = 0;
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext()) {
while (iterator.hasNext() && count++ < limit) {
Message message = iterator.next().getMessage();
internalComputeMessage(result, filter, groupByProperty, message);
}
@ -1593,14 +1597,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
clearIO();
try {
int pageSize = addressSettingsRepository.getMatch(queue.getName().toString()).getManagementBrowsePageSize();
int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
int currentPageSize = 0;
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && currentPageSize++ < pageSize) {
while (iterator.hasNext() && currentPageSize++ < limit) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
c.add(OpenTypeSupport.convert(ref));

View File

@ -922,8 +922,8 @@ config reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more
about [configuration reload](config-reload.md).
`management-browse-page-size` is the number of messages a management resource
can browse. This is relevant for the "browse" management method exposed on the
queue control. Default is `200`.
can browse. This is relevant for the `browse, list and count-with-filter` management
methods exposed on the queue control. Default is `200`.
`default-purge-on-no-consumers` defines a queue's default
`purge-on-no-consumers` setting if none is provided on the queue itself.

View File

@ -3331,6 +3331,40 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertEquals(new String(body), "theBody");
}
@Test
public void testBrowseLimitOnListBrowseAndFilteredCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
AddressSettings addressSettings = new AddressSettings().setManagementBrowsePageSize(5);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < 10; i++) {
producer.send(session.createMessage(true));
}
producer.close();
QueueControl queueControl = createManagementControl(address, queue);
// no filter, delegates to count metric
Wait.assertEquals(10, queueControl::getMessageCount);
assertEquals(5, queueControl.browse().length);
assertEquals(5, queueControl.listMessages("").length);
JsonArray array = JsonUtil.readJsonArray(queueControl.listMessagesAsJSON(""));
assertEquals(5, array.size());
// filer could match all
assertEquals(5, queueControl.countMessages("AMQSize > 0"));
session.deleteQueue(queue);
}
@Test
public void testResetGroups() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();