added the ability to discard old messages for non-durable topics if a maximum number of pending messages is reached

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382421 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-02 15:56:30 +00:00
parent f1e4459644
commit 0d5de13537
1 changed files with 25 additions and 3 deletions

View File

@ -42,6 +42,7 @@ public class TopicSubscription extends AbstractSubscription {
final protected UsageManager usageManager; final protected UsageManager usageManager;
protected int dispatched=0; protected int dispatched=0;
protected int delivered=0; protected int delivered=0;
private int maximumPendingMessages = 0;
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException { public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException {
super(broker,context, info); super(broker,context, info);
@ -50,11 +51,20 @@ public class TopicSubscription extends AbstractSubscription {
public void add(MessageReference node) throws InterruptedException, IOException { public void add(MessageReference node) throws InterruptedException, IOException {
node.incrementReferenceCount(); node.incrementReferenceCount();
if( !isFull() && !isSlaveBroker() ) { if( !isFull() && !isSlaveBroker()) {
// TODO - if we have already dispatched too many messages to this slow consumer
// should we avoid dispatching and just discard old messages as shown below
dispatch(node); dispatch(node);
} else { } else {
synchronized(matched){ synchronized (matched) {
matched.addLast(node); matched.addLast(node);
if (maximumPendingMessages > 0) {
// lets discard old messages as we are a slow consumer
while (matched.size() > maximumPendingMessages) {
MessageReference oldMessage = (MessageReference) matched.removeFirst();
oldMessage.decrementReferenceCount();
}
}
} }
} }
} }
@ -122,6 +132,18 @@ public class TopicSubscription extends AbstractSubscription {
return delivered; return delivered;
} }
public int getMaximumPendingMessages() {
return maximumPendingMessages;
}
/**
* Sets the maximum number of pending messages that can be matched against this consumer
* before old messages are discarded.
*/
public void setMaximumPendingMessages(int maximumPendingMessages) {
this.maximumPendingMessages = maximumPendingMessages;
}
private boolean isFull() { private boolean isFull() {
return dispatched-delivered >= info.getPrefetchSize(); return dispatched-delivered >= info.getPrefetchSize();
} }