This commit is contained in:
Clebert Suconic 2018-02-08 09:12:57 -05:00
commit 2eac1959df
8 changed files with 47 additions and 4 deletions

View File

@ -24,4 +24,8 @@ public interface PagedReference extends MessageReference {
PagePosition getPosition();
PagedMessage getPagedMessage();
boolean isLargeMessage();
long getTransactionID();
}

View File

@ -53,6 +53,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private Object protocolData;
private final boolean largeMessage;
private final long transactionID;
private final long messageID;
@Override
public Object getProtocolData() {
return protocolData;
@ -95,6 +101,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
this.position = position;
this.message = new WeakReference<>(message);
this.subscription = subscription;
this.largeMessage = message.getMessage().isLargeMessage();
this.transactionID = message.getTransactionID();
this.messageID = message.getMessage().getMessageID();
}
@Override
@ -256,4 +265,19 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
return this.consumerId;
}
@Override
public boolean isLargeMessage() {
return largeMessage;
}
@Override
public long getTransactionID() {
return transactionID;
}
@Override
public long getMessageID() {
return messageID;
}
}

View File

@ -849,8 +849,8 @@ final class PageSubscriptionImpl implements PageSubscription {
}
private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException {
if (reference.getPagedMessage().getTransactionID() >= 0) {
return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
if (reference.getTransactionID() >= 0) {
return pageStore.getPagingManager().getTransaction(reference.getTransactionID());
} else {
return null;
}

View File

@ -38,6 +38,8 @@ public interface MessageReference {
Message getMessage();
long getMessageID();
/**
* We define this method aggregation here because on paging we need to hold the original estimate,
* so we need to perform some extra steps on paging.

View File

@ -237,6 +237,11 @@ public class LastValueQueue extends QueueImpl {
return ref.getMessage();
}
@Override
public long getMessageID() {
return getMessage().getMessageID();
}
@Override
public Queue getQueue() {
return ref.getQueue();

View File

@ -146,6 +146,11 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return message;
}
@Override
public long getMessageID() {
return getMessage().getMessageID();
}
@Override
public Queue getQueue() {
return queue;

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -159,10 +160,12 @@ public class RefsOperation extends TransactionOperationAbstract {
if (pagedMessagesToPostACK != null) {
for (MessageReference refmsg : pagedMessagesToPostACK) {
if (((PagedReference) refmsg).isLargeMessage()) {
decrementRefCount(refmsg);
}
}
}
}
private void decrementRefCount(MessageReference refmsg) {
try {

View File

@ -867,7 +867,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
acks++;
}
while (ref.getMessage().getMessageID() != messageID);
while (ref.getMessageID() != messageID);
if (startedTransaction) {
tx.commit();