diff --git a/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java index 85d38a26c0..88aac87b67 100755 --- a/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java @@ -256,6 +256,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription { if( info.isDispatchAsync() ) { md.setConsumer(new Runnable(){ public void run() { + // Since the message gets queued up in async dispatch, we don't want to + // decrease the reference count until it gets put on the wire. onDispatch(node, message); } }); @@ -264,6 +266,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription { context.getConnection().dispatchSync(md); onDispatch(node, message); } + // The onDispatch() does the node.decrementReferenceCount(); + } else { + // We were not allowed to dispatch that message (an other consumer grabbed it before we did) + node.decrementReferenceCount(); } }