diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index d7ab42faf4..af29422667 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -461,11 +461,22 @@ public final class PageSubscriptionImpl implements PageSubscription { } + private void confirmPosition(final Transaction tx, final PagePosition position, final long persistentSize) throws Exception { + // if the cursor is persistent + if (persistent) { + store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position); + } + installTXCallback(tx, position, persistentSize); + } + @Override public void ackTx(final Transaction tx, final PagedReference reference) throws Exception { - confirmPosition(tx, reference.getPosition()); + //pre-calculate persistentSize + final long persistentSize = getPersistentSize(reference); - counter.increment(tx, -1, -getPersistentSize(reference)); + confirmPosition(tx, reference.getPosition(), persistentSize); + + counter.increment(tx, -1, -persistentSize); PageTransactionInfo txInfo = getPageTransaction(reference); if (txInfo != null) { @@ -864,11 +875,16 @@ public final class PageSubscriptionImpl implements PageSubscription { return info; } + private void installTXCallback(final Transaction tx, final PagePosition position) { + installTXCallback(tx, position, -1); + } + /** * @param tx * @param position + * @param persistentSize if negative it needs to be calculated on the fly */ - private void installTXCallback(final Transaction tx, final PagePosition position) { + private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); @@ -876,9 +892,15 @@ public final class PageSubscriptionImpl implements PageSubscription { PageCursorInfo info = getPageInfo(position); PageCache cache = info.getCache(); - long size = 0; if (cache != null) { - size = getPersistentSize(cache.getMessage(position.getMessageNr())); + final long size; + if (persistentSize < 0) { + //cache.getMessage is potentially expensive depending + //on the current cache size and which message is queried + size = getPersistentSize(cache.getMessage(position.getMessageNr())); + } else { + size = persistentSize; + } position.setPersistentSize(size); }