diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index af29422667..6a272cf0d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -213,7 +214,7 @@ public final class PageSubscriptionImpl implements PageSubscription { pageStore.getCurrentPage().getPageId() == position.getPageNr()) { pageStore.forceAnotherPage(); } - PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null); + PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr()); info.setCompleteInfo(position); synchronized (consumedPages) { consumedPages.put(Long.valueOf(position.getPageNr()), info); @@ -809,7 +810,8 @@ public final class PageSubscriptionImpl implements PageSubscription { if (cache == null) { return null; } - pageInfo = new PageCursorInfo(pageNr, cache.getNumberOfMessages(), cache); + assert pageNr == cache.getPageId(); + pageInfo = new PageCursorInfo(cache); consumedPages.put(pageNr, pageInfo); } return pageInfo; @@ -822,7 +824,7 @@ public final class PageSubscriptionImpl implements PageSubscription { synchronized (consumedPages) { PageCursorInfo pageInfo = consumedPages.get(cache.getPageId()); if (pageInfo == null) { - consumedPages.put(cache.getPageId(), new PageCursorInfo(cache.getPageId(), cache.getNumberOfMessages(), cache)); + consumedPages.put(cache.getPageId(), new PageCursorInfo(cache)); } } } @@ -950,7 +952,7 @@ public final class PageSubscriptionImpl implements PageSubscription { public final class PageCursorInfo { // Number of messages existent on this page - private final int numberOfMessages; + private int numberOfMessages; private final long pageId; @@ -1008,15 +1010,30 @@ public final class PageSubscriptionImpl implements PageSubscription { } } - private PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) { - logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s", pageId, numberOfMessages, cache); + private PageCursorInfo(final long pageId, final int numberOfMessages) { + if (numberOfMessages < 0) { + throw new IllegalStateException("numberOfMessages = " + numberOfMessages + " instead of being >=0"); + } this.pageId = pageId; + wasLive = false; this.numberOfMessages = numberOfMessages; - if (cache != null) { - wasLive = cache.isLive(); - this.cache = new WeakReference<>(cache); + logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, not live", pageId, numberOfMessages); + } + + private PageCursorInfo(final PageCache cache) { + Objects.requireNonNull(cache); + this.pageId = cache.getPageId(); + wasLive = cache.isLive(); + this.cache = new WeakReference<>(cache); + if (!wasLive) { + final int numberOfMessages = cache.getNumberOfMessages(); + assert numberOfMessages >= 0; + this.numberOfMessages = numberOfMessages; + logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s, not live", pageId, this.numberOfMessages, cache); } else { - wasLive = false; + //given that is live, the exact value must be get directly from cache + this.numberOfMessages = -1; + logger.tracef("Created PageCursorInfo for pageNr=%d, cache=%s, live", pageId, cache); } } @@ -1117,18 +1134,32 @@ public final class PageSubscriptionImpl implements PageSubscription { } } - private int getNumberOfMessages() { - if (wasLive) { - // if the page was live at any point, we need to - // get the number of messages from the page-cache - PageCache localcache = this.cache.get(); - if (localcache == null) { - localcache = cursorProvider.getPageCache(pageId); - this.cache = new WeakReference<>(localcache); - } + private int getNumberOfMessagesFromPageCache() { + // if the page was live at any point, we need to + // get the number of messages from the page-cache + PageCache localCache = this.cache.get(); + if (localCache == null) { + localCache = cursorProvider.getPageCache(pageId); + this.cache = new WeakReference<>(localCache); + } + int numberOfMessage = localCache.getNumberOfMessages(); + if (!localCache.isLive()) { + //to avoid further "live" queries + this.numberOfMessages = numberOfMessage; + } + return numberOfMessage; + } - return localcache.getNumberOfMessages(); + private int getNumberOfMessages() { + final int numberOfMessages = this.numberOfMessages; + if (wasLive) { + if (numberOfMessages < 0) { + return getNumberOfMessagesFromPageCache(); + } else { + return numberOfMessages; + } } else { + assert numberOfMessages >= 0; return numberOfMessages; } }