ARTEMIS-828 Queue browsing can be out of sync while paging

https://issues.apache.org/jira/browse/ARTEMIS-828
This commit is contained in:
Clebert Suconic 2016-10-27 17:30:34 -04:00
parent e0021252ee
commit bfb9bedb2d
12 changed files with 81 additions and 39 deletions

View File

@ -410,7 +410,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
@ -446,7 +446,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
// returns just the first, as it's the first only
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
@ -499,7 +499,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (filter == null) {
return getMessageCount();
} else {
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
int count = 0;
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
@ -895,7 +895,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
ArrayList<CompositeData> c = new ArrayList<>();
Filter filter = FilterImpl.createFilter(filterStr);
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
while (iterator.hasNext() && currentPageSize++ < pageSize) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {

View File

@ -56,7 +56,10 @@ public interface PageSubscription {
LinkedListIterator<PagedReference> iterator();
// To be called when the cursor is closed for good. Most likely when the queue is deleted
LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);
// To be called when the cursor is closed for good. Most likely when the queue is deleted
void destroy() throws Exception;
void scheduleCleanupCheck();

View File

@ -251,7 +251,6 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
recordID = -1;
value.set(0);
added.set(0);
incrementRecords.clear();
}
} finally {

View File

@ -351,6 +351,11 @@ final class PageSubscriptionImpl implements PageSubscription {
return new CursorIterator();
}
@Override
public PageIterator iterator(boolean browsing) {
return new CursorIterator(browsing);
}
private PagedReference internalGetNext(final PagePosition pos) {
PagePosition retPos = pos.nextMessage();
@ -1100,6 +1105,8 @@ final class PageSubscriptionImpl implements PageSubscription {
private volatile PagedReference lastRedelivery = null;
private final boolean browsing;
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final java.util.Queue<PagePosition> redeliveries = new LinkedList<>();
@ -1109,7 +1116,13 @@ final class PageSubscriptionImpl implements PageSubscription {
*/
private volatile PagedReference cachedNext;
private CursorIterator(boolean browsing) {
this.browsing = browsing;
}
private CursorIterator() {
this.browsing = false;
}
@Override
@ -1199,7 +1212,7 @@ final class PageSubscriptionImpl implements PageSubscription {
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
continue;
}
@ -1225,7 +1238,7 @@ final class PageSubscriptionImpl implements PageSubscription {
// nothing
// is being changed. That's why the false is passed as a parameter here
if (info != null && info.isRemoved(message.getPosition())) {
if (!browsing && info != null && info.isRemoved(message.getPosition())) {
valid = false;
}
}
@ -1237,10 +1250,10 @@ final class PageSubscriptionImpl implements PageSubscription {
if (valid) {
match = match(message.getMessage());
if (!match) {
if (!browsing && !match) {
processACK(message.getPosition());
}
} else if (ignored) {
} else if (!browsing && ignored) {
positionIgnored(message.getPosition());
}
} while (!match);

View File

@ -195,7 +195,7 @@ public interface Queue extends Bindable {
*/
LinkedListIterator<MessageReference> iterator();
LinkedListIterator<MessageReference> totalIterator();
LinkedListIterator<MessageReference> browserIterator();
SimpleString getExpiryAddress();

View File

@ -867,8 +867,8 @@ public class QueueImpl implements Queue {
}
@Override
public TotalQueueIterator totalIterator() {
return new TotalQueueIterator();
public QueueBrowserIterator browserIterator() {
return new QueueBrowserIterator();
}
@Override
@ -2863,17 +2863,23 @@ public class QueueImpl implements Queue {
//Readonly (no remove) iterator over the messages in the queue, in order of
//paging store, intermediateMessageReferences and MessageReferences
private class TotalQueueIterator implements LinkedListIterator<MessageReference> {
private class QueueBrowserIterator implements LinkedListIterator<MessageReference> {
LinkedListIterator<PagedReference> pageIter = null;
LinkedListIterator<PagedReference> pagingIterator = null;
LinkedListIterator<MessageReference> messagesIterator = null;
private LinkedListIterator<PagedReference> getPagingIterator() {
if (pagingIterator == null) {
pagingIterator = pageSubscription.iterator(true);
}
return pagingIterator;
}
Iterator lastIterator = null;
private TotalQueueIterator() {
if (pageSubscription != null) {
pageIter = pageSubscription.iterator();
}
MessageReference cachedNext = null;
private QueueBrowserIterator() {
messagesIterator = new SynchronizedIterator(messageReferences.iterator());
}
@ -2883,9 +2889,9 @@ public class QueueImpl implements Queue {
lastIterator = messagesIterator;
return true;
}
if (pageIter != null) {
if (pageIter.hasNext()) {
lastIterator = pageIter;
if (getPagingIterator() != null) {
if (getPagingIterator().hasNext()) {
lastIterator = getPagingIterator();
return true;
}
}
@ -2893,16 +2899,37 @@ public class QueueImpl implements Queue {
return false;
}
@Override
public MessageReference next() {
if (messagesIterator != null && messagesIterator.hasNext()) {
MessageReference msg = messagesIterator.next();
return msg;
if (cachedNext != null) {
try {
return cachedNext;
} finally {
cachedNext = null;
}
}
if (pageIter != null) {
if (pageIter.hasNext()) {
lastIterator = pageIter;
return pageIter.next();
while (true) {
if (messagesIterator != null && messagesIterator.hasNext()) {
MessageReference msg = messagesIterator.next();
if (msg.isPaged()) {
System.out.println("** Rejecting because it's paged " + msg.getMessage());
continue;
}
// System.out.println("** Returning because it's not paged " + msg.getMessage());
return msg;
} else {
break;
}
}
if (getPagingIterator() != null) {
if (getPagingIterator().hasNext()) {
lastIterator = getPagingIterator();
MessageReference ref = getPagingIterator().next();
return ref;
}
}
@ -2922,8 +2949,8 @@ public class QueueImpl implements Queue {
@Override
public void close() {
if (pageIter != null) {
pageIter.close();
if (getPagingIterator() != null) {
getPagingIterator().close();
}
if (messagesIterator != null) {
messagesIterator.close();

View File

@ -165,7 +165,7 @@ public class ScaleDownHandler {
for (Queue loopQueue : queues) {
logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);
try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.totalIterator()) {
try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) {
while (messagesIterator.hasNext()) {
MessageReference messageReference = messagesIterator.next();
@ -249,7 +249,7 @@ public class ScaleDownHandler {
for (Queue queue : queues) {
// using auto-closeable
try (LinkedListIterator<MessageReference> messagesIterator = queue.totalIterator()) {
try (LinkedListIterator<MessageReference> messagesIterator = queue.browserIterator()) {
// loop through every message of this queue
while (messagesIterator.hasNext()) {
MessageReference messageRef = messagesIterator.next();

View File

@ -206,7 +206,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.creationTime = System.currentTimeMillis();
if (browseOnly) {
browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {
messageQueue.addConsumer(this);
}

View File

@ -1212,7 +1212,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public LinkedListIterator<MessageReference> totalIterator() {
public LinkedListIterator<MessageReference> browserIterator() {
return null;
}

View File

@ -308,7 +308,7 @@ public class PagingSendTest extends ActiveMQTestBase {
* duplicates that may have happened before this point).
*/
public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception {
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
Set<String> messageOrderSet = new HashSet<>();
@ -344,7 +344,7 @@ public class PagingSendTest extends ActiveMQTestBase {
* duplicates that may have happened before this point).
*/
protected int processCountThroughIterator(Queue queue) throws Exception {
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
int count = 0;
while (pageIterator.hasNext()) {

View File

@ -604,7 +604,7 @@ public class FakeQueue implements Queue {
}
@Override
public LinkedListIterator<MessageReference> totalIterator() {
public LinkedListIterator<MessageReference> browserIterator() {
// TODO Auto-generated method stub
return null;
}

View File

@ -1274,7 +1274,7 @@ public class QueueImplTest extends ActiveMQTestBase {
locator.close();
Queue queue = ((LocalQueueBinding) server.getPostOffice().getBinding(new SimpleString(MY_QUEUE))).getQueue();
LinkedListIterator<MessageReference> totalIterator = queue.totalIterator();
LinkedListIterator<MessageReference> totalIterator = queue.browserIterator();
try {
int i = 0;