From c9c11074a8b278fdf3930dba6db9a0bcecea69b3 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 20 Mar 2008 15:31:34 +0000 Subject: [PATCH] Fix queue reference counting. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@639315 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/region/PrefetchSubscription.java | 9 ++++----- .../java/org/apache/activemq/broker/region/Queue.java | 3 +-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 6778840a15..2bba05edca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -458,11 +458,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (node == null) { break; } - if(isDropped(node)) { - pending.remove(); - } - else if (canDispatch(node)) { - pending.remove(); + + pending.remove(); + if( !isDropped(node) && canDispatch(node)) { + // Message may have been sitting in the pending // list a while waiting for the consumer to ak the message. if (node != QueueMessageReference.NULL_MESSAGE diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 051ea9ed77..5b7c8c8a73 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1002,7 +1002,6 @@ public class Queue extends BaseDestination implements Task { reference.drop(); acknowledge(context, sub, ack, reference); destinationStatistics.getMessages().decrement(); - reference.decrementReferenceCount(); synchronized(pagedInMessages) { pagedInMessages.remove(reference.getMessageId()); } @@ -1056,6 +1055,7 @@ public class Queue extends BaseDestination implements Task { messages.reset(); while (messages.hasNext() && count < toPageIn) { MessageReference node = messages.next(); + node.incrementReferenceCount(); messages.remove(); if (!broker.isExpired(node)) { QueueMessageReference ref = createMessageReference(node.getMessage()); @@ -1097,7 +1097,6 @@ public class Queue extends BaseDestination implements Task { if (dispatchSelector.canSelect(s, node)) { if (!s.isFull()) { s.add(node); - node.incrementReferenceCount(); target = s; break; } else {