diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 6778840a15..2bba05edca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -458,11 +458,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (node == null) { break; } - if(isDropped(node)) { - pending.remove(); - } - else if (canDispatch(node)) { - pending.remove(); + + pending.remove(); + if( !isDropped(node) && canDispatch(node)) { + // Message may have been sitting in the pending // list a while waiting for the consumer to ak the message. if (node != QueueMessageReference.NULL_MESSAGE diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 051ea9ed77..5b7c8c8a73 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1002,7 +1002,6 @@ public class Queue extends BaseDestination implements Task { reference.drop(); acknowledge(context, sub, ack, reference); destinationStatistics.getMessages().decrement(); - reference.decrementReferenceCount(); synchronized(pagedInMessages) { pagedInMessages.remove(reference.getMessageId()); } @@ -1056,6 +1055,7 @@ public class Queue extends BaseDestination implements Task { messages.reset(); while (messages.hasNext() && count < toPageIn) { MessageReference node = messages.next(); + node.incrementReferenceCount(); messages.remove(); if (!broker.isExpired(node)) { QueueMessageReference ref = createMessageReference(node.getMessage()); @@ -1097,7 +1097,6 @@ public class Queue extends BaseDestination implements Task { if (dispatchSelector.canSelect(s, node)) { if (!s.isFull()) { s.add(node); - node.incrementReferenceCount(); target = s; break; } else {