diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 1c706e9d7f..6778840a15 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -610,15 +610,4 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; } - - - public List getInFlightMessages(){ - List result = new ArrayList(); - synchronized(pendingLock) { - result.addAll(dispatched); - result.addAll(pending.pageInList(1000)); - } - return result; - } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 9207353b8f..9439a25d0f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -944,6 +944,7 @@ public class Queue extends BaseDestination implements Task { reference.drop(); acknowledge(context, sub, ack, reference); destinationStatistics.getMessages().decrement(); + reference.decrementReferenceCount(); synchronized(pagedInMessages) { pagedInMessages.remove(reference.getMessageId()); } @@ -1034,6 +1035,7 @@ public class Queue extends BaseDestination implements Task { if (dispatchSelector.canSelect(s, node)) { if (!s.isFull()) { s.add(node); + node.incrementReferenceCount(); target = s; break; } else { @@ -1055,6 +1057,7 @@ public class Queue extends BaseDestination implements Task { } if (target != null) { target.add(node); + node.incrementReferenceCount(); } } if (target != null && !strictOrderDispatch && consumers.size() > 1 && diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 9c4fb03cca..48034f5fd7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -107,59 +107,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner return info.isExclusive(); } - /** - * Override so that the message ref count is > 0 only when the message is - * being dispatched to a client. Keeping it at 0 when it is in the pending - * list allows the message to be swapped out to disk. - * - * @return true if the message was dispatched. - */ - protected boolean dispatch(MessageReference node) throws IOException { - boolean rc = false; - // This brings the message into memory if it was swapped out. - node.incrementReferenceCount(); - try { - rc = super.dispatch(node); - } finally { - // If the message was dispatched, it could be getting dispatched - // async, so we - // can only drop the reference count when that completes @see - // onDispatch - if (!rc) { - node.decrementReferenceCount(); - } - } - return rc; - } - - /** - * OK Message was transmitted, we can now drop the reference count. - * - * @see org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference, - * org.apache.activemq.command.Message) - */ - protected void onDispatch(MessageReference node, Message message) { - // Now that the message has been sent over the wire to the client, - // we can let it get swapped out. - node.decrementReferenceCount(); - super.onDispatch(node, message); - } - - /** - * Sending a message to the DQL will require us to increment the ref count - * so we can get at the content. - */ - protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException, Exception { - // This brings the message into memory if it was swapped out. - node.incrementReferenceCount(); - try { - super.sendToDLQ(context, node); - } finally { - // This let's the message be swapped out of needed. - node.decrementReferenceCount(); - } - } - /** */ public void destroy() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index 2eec6e2b2d..0642be9d9d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -215,11 +215,4 @@ public interface Subscription extends SubscriptionRecovery { * @return true if a browser */ boolean isBrowser(); - - /** - * Get the list of in flight messages - * @return list - */ - List getInFlightMessages(); - }