ARTEMIS-1650 Improve paged message acknowledge

Cache `messageID`, `transactionID` and `isLargeMessage`
in PagedReference, so that when acknowledge, we do not have to
get PagedMessage which may be GCed and cause re-read entire page.
This commit is contained in:
huaishk 2018-01-31 09:48:43 +08:00 committed by Clebert Suconic
parent 33b265ca6b
commit 822445a717
8 changed files with 47 additions and 4 deletions

View File

@ -24,4 +24,8 @@ public interface PagedReference extends MessageReference {
PagePosition getPosition(); PagePosition getPosition();
PagedMessage getPagedMessage(); PagedMessage getPagedMessage();
boolean isLargeMessage();
long getTransactionID();
} }

View File

@ -53,6 +53,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private Object protocolData; private Object protocolData;
private final boolean largeMessage;
private final long transactionID;
private final long messageID;
@Override @Override
public Object getProtocolData() { public Object getProtocolData() {
return protocolData; return protocolData;
@ -95,6 +101,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
this.position = position; this.position = position;
this.message = new WeakReference<>(message); this.message = new WeakReference<>(message);
this.subscription = subscription; this.subscription = subscription;
this.largeMessage = message.getMessage().isLargeMessage();
this.transactionID = message.getTransactionID();
this.messageID = message.getMessage().getMessageID();
} }
@Override @Override
@ -256,4 +265,19 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
return this.consumerId; return this.consumerId;
} }
@Override
public boolean isLargeMessage() {
return largeMessage;
}
@Override
public long getTransactionID() {
return transactionID;
}
@Override
public long getMessageID() {
return messageID;
}
} }

View File

@ -849,8 +849,8 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException { private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException {
if (reference.getPagedMessage().getTransactionID() >= 0) { if (reference.getTransactionID() >= 0) {
return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID()); return pageStore.getPagingManager().getTransaction(reference.getTransactionID());
} else { } else {
return null; return null;
} }

View File

@ -38,6 +38,8 @@ public interface MessageReference {
Message getMessage(); Message getMessage();
long getMessageID();
/** /**
* We define this method aggregation here because on paging we need to hold the original estimate, * We define this method aggregation here because on paging we need to hold the original estimate,
* so we need to perform some extra steps on paging. * so we need to perform some extra steps on paging.

View File

@ -237,6 +237,11 @@ public class LastValueQueue extends QueueImpl {
return ref.getMessage(); return ref.getMessage();
} }
@Override
public long getMessageID() {
return getMessage().getMessageID();
}
@Override @Override
public Queue getQueue() { public Queue getQueue() {
return ref.getQueue(); return ref.getQueue();

View File

@ -146,6 +146,11 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return message; return message;
} }
@Override
public long getMessageID() {
return getMessage().getMessageID();
}
@Override @Override
public Queue getQueue() { public Queue getQueue() {
return queue; return queue;

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage; import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -159,7 +160,9 @@ public class RefsOperation extends TransactionOperationAbstract {
if (pagedMessagesToPostACK != null) { if (pagedMessagesToPostACK != null) {
for (MessageReference refmsg : pagedMessagesToPostACK) { for (MessageReference refmsg : pagedMessagesToPostACK) {
decrementRefCount(refmsg); if (((PagedReference) refmsg).isLargeMessage()) {
decrementRefCount(refmsg);
}
} }
} }
} }

View File

@ -867,7 +867,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
acks++; acks++;
} }
while (ref.getMessage().getMessageID() != messageID); while (ref.getMessageID() != messageID);
if (startedTransaction) { if (startedTransaction) {
tx.commit(); tx.commit();