diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index a9b860bf69..78a6792576 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -42,6 +42,7 @@ public class TopicSubscription extends AbstractSubscription { final protected UsageManager usageManager; protected int dispatched=0; protected int delivered=0; + private int maximumPendingMessages = 0; public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException { super(broker,context, info); @@ -50,11 +51,20 @@ public class TopicSubscription extends AbstractSubscription { public void add(MessageReference node) throws InterruptedException, IOException { node.incrementReferenceCount(); - if( !isFull() && !isSlaveBroker() ) { + if( !isFull() && !isSlaveBroker()) { + // TODO - if we have already dispatched too many messages to this slow consumer + // should we avoid dispatching and just discard old messages as shown below dispatch(node); } else { - synchronized(matched){ - matched.addLast(node); + synchronized (matched) { + matched.addLast(node); + if (maximumPendingMessages > 0) { + // lets discard old messages as we are a slow consumer + while (matched.size() > maximumPendingMessages) { + MessageReference oldMessage = (MessageReference) matched.removeFirst(); + oldMessage.decrementReferenceCount(); + } + } } } } @@ -122,6 +132,18 @@ public class TopicSubscription extends AbstractSubscription { return delivered; } + public int getMaximumPendingMessages() { + return maximumPendingMessages; + } + + /** + * Sets the maximum number of pending messages that can be matched against this consumer + * before old messages are discarded. + */ + public void setMaximumPendingMessages(int maximumPendingMessages) { + this.maximumPendingMessages = maximumPendingMessages; + } + private boolean isFull() { return dispatched-delivered >= info.getPrefetchSize(); }