ARTEMIS-3769 Fixing queue browsing iterator avoiding NoSuchElement exception from the Iteration

QueueImpl::browserIterator could throw NoSuchElementException and this is fixing the iterator
Found this while testing ARTEMIS-3761
This commit is contained in:
Clebert Suconic 2022-04-07 09:19:16 -04:00 committed by clebertsuconic
parent bad1c26582
commit 1da68b3024
3 changed files with 59 additions and 17 deletions

View File

@ -4287,13 +4287,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public boolean hasNext() {
if (messagesIterator != null && messagesIterator.hasNext()) {
if (cachedNext != null) {
return true;
}
if (messagesIterator != null) {
MessageReference nextMessage = iterate(messagesIterator);
if (nextMessage != null) {
cachedNext = nextMessage;
lastIterator = messagesIterator;
return true;
}
if (getPagingIterator() != null) {
if (getPagingIterator().hasNext()) {
lastIterator = getPagingIterator();
}
LinkedListIterator<PagedReference> pagingIterator = getPagingIterator();
if (pagingIterator != null) {
PagedReference nextMessage = iteratePaging(pagingIterator);
if (nextMessage != null) {
cachedNext = nextMessage;
lastIterator = pagingIterator;
return true;
}
}
@ -4301,6 +4313,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return false;
}
private PagedReference iteratePaging(LinkedListIterator<PagedReference> iterator) {
while (iterator.hasNext()) {
PagedReference ref = iterator.next();
// During regular depaging we move messages from paging into QueueImpl::messageReferences
// later on the PagingIterator will read messages from the page files
// and this step will avoid reproducing those messages twice.
// once we found a previouslyBrowsed message we can remove it from this list as it's no longer needed
// since it won't be read again
if (!previouslyBrowsed.remove(ref.getPosition())) {
return ref;
}
}
return null;
}
private MessageReference iterate(LinkedListIterator<MessageReference> iterator) {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
if (ref.isPaged()) {
previouslyBrowsed.add(((PagedReference)ref).getPosition());
}
return ref;
}
return null;
}
@Override
public MessageReference next() {
@ -4312,20 +4352,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
if (messagesIterator != null && messagesIterator.hasNext()) {
MessageReference msg = messagesIterator.next();
if (msg.isPaged()) {
previouslyBrowsed.add(((PagedReference) msg).getPosition());
MessageReference ref = iterate(messagesIterator);
if (ref != null) {
return ref;
}
return msg;
}
if (getPagingIterator() != null) {
while (getPagingIterator().hasNext()) {
lastIterator = getPagingIterator();
PagedReference ref = getPagingIterator().next();
if (previouslyBrowsed.contains(ref.getPosition())) {
continue;
}
LinkedListIterator<PagedReference> pagingIterator = getPagingIterator();
if (pagingIterator != null) {
PagedReference ref = iteratePaging(pagingIterator);
if (ref != null) {
return ref;
}
}

View File

@ -179,7 +179,7 @@ public class ScaleDownHandler {
MessageReference messageReference = messagesIterator.next();
Message message = messageReference.getMessage().copy();
logger.debug("Reading message " + message + " from queue " + loopQueue);
logger.debugf("Reading message %s from queue %s", message, loopQueue);
Set<QueuesXRefInnerManager> queuesFound = new HashSet<>();
for (Map.Entry<Queue, QueuesXRefInnerManager> controlEntry : controls.entrySet()) {
@ -221,6 +221,7 @@ public class ScaleDownHandler {
}
}
} catch (NoSuchElementException ignored) {
logger.debug(ignored.getMessage(), ignored);
// this could happen through paging browsing
}
}

View File

@ -1465,12 +1465,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref = null;
synchronized (messageQueue) {
if (!iterator.hasNext()) {
logger.tracef("browser finished");
callback.browserFinished(ServerConsumerImpl.this);
break;
}
ref = iterator.next();
logger.tracef("Receiving %s", ref.getMessage());
status = handle(ref);
}