If a topic consumer was hung up, it would eventually stop the producers since the broker memory limit would be reached.

The problem was if the consumer was killed, the broker memory would not get freed up and so the producer would remain blocked.
When a subscription is removed, the memory of the pending messages are now released.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@394707 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-17 15:32:28 +00:00
parent 91720daee7
commit c46562ba7c
6 changed files with 54 additions and 7 deletions

View File

@ -180,9 +180,11 @@ abstract public class AbstractRegion implements Region {
}
destroySubscription(sub);
}
protected void destroySubscription(Subscription sub) {
sub.destroy();
}
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {

View File

@ -181,5 +181,24 @@ public class DurableTopicSubscription extends PrefetchSubscription {
public SubscriptionKey getSubscriptionKey() {
return subscriptionKey;
}
/**
* Release any references that we are holding.
*/
synchronized public void destroy() {
for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount();
}
pending.clear();
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount();
}
dispatched.clear();
}
}

View File

@ -372,5 +372,4 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
throws IOException{}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.transaction.Synchronization;
import javax.jms.InvalidSelectorException;
import java.io.IOException;
import java.util.Iterator;
public class QueueSubscription extends PrefetchSubscription implements LockOwner {
@ -184,4 +185,9 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
}
}
/**
*/
synchronized public void destroy() {
}
}

View File

@ -170,4 +170,10 @@ public interface Subscription {
*
*/
public void optimizePrefetch();
/**
* Called when the subscription is destroyed.
*/
public void destroy();
}

View File

@ -53,7 +53,8 @@ public class TopicSubscription extends AbstractSubscription{
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private int discarded = 0;
private final Object matchedListMutex=new Object();
long enqueueCounter;
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
throws InvalidSelectorException{
@ -62,8 +63,10 @@ public class TopicSubscription extends AbstractSubscription{
}
public void add(MessageReference node) throws InterruptedException,IOException{
enqueueCounter++;
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
optimizePrefetch();
// if maximumPendingMessages is set we will only discard messages which
@ -131,6 +134,7 @@ public class TopicSubscription extends AbstractSubscription{
}
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case.
boolean wasFull=isFull();
if(ack.isStandardAck()||ack.isPoisonAck()){
@ -138,11 +142,13 @@ public class TopicSubscription extends AbstractSubscription{
delivered.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
}
});
}else{
dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
}
@ -178,14 +184,13 @@ public class TopicSubscription extends AbstractSubscription{
}
public long getEnqueueCounter() {
return enqueueCounter;
return enqueueCounter.get();
}
public long getDequeueCounter(){
return delivered.get();
return dequeueCounter.get();
}
/**
* @return the number of messages discarded due to being a slow consumer
*/
@ -315,4 +320,14 @@ public class TopicSubscription extends AbstractSubscription{
+", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()+", discarded="+discarded();
}
public void destroy() {
synchronized(matchedListMutex){
for (Iterator iter = matched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount();
}
matched.clear();
}
}
}