From 822445a717f943c64a84b2ac6a0af8ace9e5cd23 Mon Sep 17 00:00:00 2001 From: huaishk Date: Wed, 31 Jan 2018 09:48:43 +0800 Subject: [PATCH] ARTEMIS-1650 Improve paged message acknowledge Cache `messageID`, `transactionID` and `isLargeMessage` in PagedReference, so that when acknowledge, we do not have to get PagedMessage which may be GCed and cause re-read entire page. --- .../core/paging/cursor/PagedReference.java | 4 ++++ .../paging/cursor/PagedReferenceImpl.java | 24 +++++++++++++++++++ .../cursor/impl/PageSubscriptionImpl.java | 4 ++-- .../artemis/core/server/MessageReference.java | 2 ++ .../core/server/impl/LastValueQueue.java | 5 ++++ .../server/impl/MessageReferenceImpl.java | 5 ++++ .../core/server/impl/RefsOperation.java | 5 +++- .../core/server/impl/ServerConsumerImpl.java | 2 +- 8 files changed, 47 insertions(+), 4 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java index c1ff089681..be2d042fb4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java @@ -24,4 +24,8 @@ public interface PagedReference extends MessageReference { PagePosition getPosition(); PagedMessage getPagedMessage(); + + boolean isLargeMessage(); + + long getTransactionID(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 7189007f7d..42c54230d3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -53,6 +53,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private Object protocolData; + private final boolean largeMessage; + + private final long transactionID; + + private final long messageID; + @Override public Object getProtocolData() { return protocolData; @@ -95,6 +101,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node this.position = position; this.message = new WeakReference<>(message); this.subscription = subscription; + this.largeMessage = message.getMessage().isLargeMessage(); + this.transactionID = message.getTransactionID(); + this.messageID = message.getMessage().getMessageID(); } @Override @@ -256,4 +265,19 @@ public class PagedReferenceImpl extends LinkedListImpl.Node return this.consumerId; } + @Override + public boolean isLargeMessage() { + return largeMessage; + } + + @Override + public long getTransactionID() { + return transactionID; + } + + @Override + public long getMessageID() { + return messageID; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index a674935a0b..24c69bea47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -849,8 +849,8 @@ final class PageSubscriptionImpl implements PageSubscription { } private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException { - if (reference.getPagedMessage().getTransactionID() >= 0) { - return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID()); + if (reference.getTransactionID() >= 0) { + return pageStore.getPagingManager().getTransaction(reference.getTransactionID()); } else { return null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 799b0b0dfc..906ea7e566 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -38,6 +38,8 @@ public interface MessageReference { Message getMessage(); + long getMessageID(); + /** * We define this method aggregation here because on paging we need to hold the original estimate, * so we need to perform some extra steps on paging. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 7aada5e881..90b88141c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -237,6 +237,11 @@ public class LastValueQueue extends QueueImpl { return ref.getMessage(); } + @Override + public long getMessageID() { + return getMessage().getMessageID(); + } + @Override public Queue getQueue() { return ref.getQueue(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 1b434bc28e..7543ba5ba1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -146,6 +146,11 @@ public class MessageReferenceImpl extends LinkedListImpl.Node