diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index ab55f07731..b470ee5efb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -111,4 +111,6 @@ public interface Destination extends Service { * @param value */ public void setLazyDispatch(boolean value); + + void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index aa6c84201d..7635675871 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -202,4 +202,8 @@ public class DestinationFilter implements Destination { public void setLazyDispatch(boolean value) { next.setLazyDispatch(value); } + + public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) { + next.messageExpired(context, prefetchSubscription, node); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 06bbe0f623..20079598c4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -246,12 +246,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // the // acknowledgment. int index = 0; - for (Iterator iter = dispatched.iterator(); iter - .hasNext(); index++) { + for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) { final MessageReference node = iter.next(); + if( node.isExpired() ) { + broker.messageExpired(getContext(), node); + node.getRegionDestination().messageExpired(context, this, node); + node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + dispatched.remove(node); + } if (ack.getLastMessageId().equals(node.getMessageId())) { - prefetchExtension = Math.max(prefetchExtension, - index + 1); + prefetchExtension = Math.max(prefetchExtension, index + 1); callDispatchMatched = true; break; } @@ -471,12 +476,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // Message may have been sitting in the pending // list a while waiting for the consumer to ak the message. - if (node != QueueMessageReference.NULL_MESSAGE - && node.isExpired()) { + if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { broker.messageExpired(getContext(), node); - dequeueCounter++; //increment number to dispatch numberToDispatch++; + node.getRegionDestination().messageExpired(context, this, node); continue; } dispatch(node); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index a3c5aadc7a..21c2c3d7a8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1003,7 +1003,17 @@ public class Queue extends BaseDestination implements Task { } wakeup(); } - + + public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference reference) { + ((QueueMessageReference)reference).drop(); + // Not sure.. perhaps we should forge an ack to remove the message from the store. + // acknowledge(context, sub, ack, reference); + destinationStatistics.getMessages().decrement(); + synchronized(pagedInMessages) { + pagedInMessages.remove(reference.getMessageId()); + } + wakeup(); + } protected ConnectionContext createConnectionContext() { ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); @@ -1037,7 +1047,7 @@ public class Queue extends BaseDestination implements Task { dispatchLock.lock(); try{ - int toPageIn = getMaxPageSize() - pagedInMessages.size(); + int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size(); if (isLazyDispatch()&& !force) { // Only page in the minimum number of messages which can be dispatched immediately. toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 9df1d91f4a..2abcc74b0d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -631,4 +631,9 @@ public class Topic extends BaseDestination implements Task{ } } + public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) { + // TODO Auto-generated method stub + + } + }