From d65ba8034bd10aa45b9b8f0a8cbe72034eb33e67 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 28 Apr 2006 19:13:54 +0000 Subject: [PATCH] Gaurd access to the pending list better. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@397985 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/DurableTopicSubscription.java | 42 ++++++++++--------- .../broker/region/PrefetchSubscription.java | 30 +++++-------- .../region/QueueBrowserSubscription.java | 2 +- .../broker/region/QueueSubscription.java | 2 +- 4 files changed, 35 insertions(+), 41 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index a3144a00e1..d20954f6a7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -63,9 +63,7 @@ public class DurableTopicSubscription extends PrefetchSubscription { Topic topic = (Topic) destination; topic.activate(context, this); } - if( !isFull() ) { - dispatchMatched(); - } + dispatchMatched(); } synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception { @@ -79,9 +77,7 @@ public class DurableTopicSubscription extends PrefetchSubscription { topic.activate(context, this); } } - if( !isFull() ) { - dispatchMatched(); - } + dispatchMatched(); } } @@ -104,7 +100,9 @@ public class DurableTopicSubscription extends PrefetchSubscription { redeliveredMessages.put(node.getMessageId(), new Integer(1)); } if( keepDurableSubsActive ) { - pending.addFirst(node); + synchronized(pending) { + pending.addFirst(node); + } } else { node.decrementReferenceCount(); } @@ -112,11 +110,13 @@ public class DurableTopicSubscription extends PrefetchSubscription { } if( !keepDurableSubsActive ) { - for (Iterator iter = pending.iterator(); iter.hasNext();) { - MessageReference node = (MessageReference) iter.next(); - node.decrementReferenceCount(); - iter.remove(); - } + synchronized(pending) { + for (Iterator iter = pending.iterator(); iter.hasNext();) { + MessageReference node = (MessageReference) iter.next(); + node.decrementReferenceCount(); + iter.remove(); + } + } } prefetchExtension=0; } @@ -171,7 +171,7 @@ public class DurableTopicSubscription extends PrefetchSubscription { ", destinations="+destinations.size()+ ", dispatched="+dispatched.size()+ ", delivered="+this.prefetchExtension+ - ", pending="+this.pending.size(); + ", pending="+getPendingQueueSize(); } public String getClientId() { @@ -186,13 +186,15 @@ public class DurableTopicSubscription extends PrefetchSubscription { * Release any references that we are holding. */ synchronized public void destroy() { - - for (Iterator iter = pending.iterator(); iter.hasNext();) { - MessageReference node = (MessageReference) iter.next(); - node.decrementReferenceCount(); - } - pending.clear(); - + + synchronized(pending) { + for (Iterator iter = pending.iterator(); iter.hasNext();) { + MessageReference node = (MessageReference) iter.next(); + node.decrementReferenceCount(); + } + pending.clear(); + } + for (Iterator iter = dispatched.iterator(); iter.hasNext();) { MessageReference node = (MessageReference) iter.next(); node.decrementReferenceCount(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 88c84790b1..239c557023 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -94,7 +94,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ // Handle the standard acknowledgment case. - boolean wasFull=isFull(); if(ack.isStandardAck()){ // Acknowledge all dispatched messages up till the message id of the acknowledgment. int index=0; @@ -129,9 +128,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ prefetchExtension=Math.max(prefetchExtension,index+1); else prefetchExtension=Math.max(0,prefetchExtension-(index+1)); - if(wasFull&&!isFull()){ - dispatchMatched(); - } + dispatchMatched(); return; }else{ // System.out.println("no match: "+ack.getLastMessageId()+","+messageId); @@ -147,9 +144,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ final MessageReference node=(MessageReference) iter.next(); if(ack.getLastMessageId().equals(node.getMessageId())){ prefetchExtension=Math.max(prefetchExtension,index+1); - if(wasFull&&!isFull()){ - dispatchMatched(); - } + dispatchMatched(); return; } } @@ -176,9 +171,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ acknowledge(context,ack,node); if(ack.getLastMessageId().equals(messageId)){ prefetchExtension=Math.max(0,prefetchExtension-(index+1)); - if(wasFull&&!isFull()){ - dispatchMatched(); - } + dispatchMatched(); return; } } @@ -226,8 +219,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9); } - synchronized public int getPendingQueueSize(){ - return pending.size(); + public int getPendingQueueSize(){ + synchronized(pending) { + return pending.size(); + } } synchronized public int getDispatchedQueueSize(){ @@ -312,16 +307,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } synchronized protected void onDispatch(final MessageReference node,final Message message){ - boolean wasFull=isFull(); if(node.getRegionDestination()!=null){ node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); context.getConnection().getStatistics().onMessageDequeue(message); - if(wasFull&&!isFull()){ - try{ - dispatchMatched(); - }catch(IOException e){ - context.getConnection().serviceException(e); - } + try{ + dispatchMatched(); + }catch(IOException e){ + context.getConnection().serviceException(e); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index a643b5fe36..4d08573b75 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -47,7 +47,7 @@ public class QueueBrowserSubscription extends QueueSubscription { ", destinations="+destinations.size()+ ", dispatched="+dispatched.size()+ ", delivered="+this.prefetchExtension+ - ", pending="+this.pending.size(); + ", pending="+getPendingQueueSize(); } public void browseDone() throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 1dee6e8658..b7f9cb8cd5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -125,7 +125,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner ", destinations="+destinations.size()+ ", dispatched="+dispatched.size()+ ", delivered="+this.prefetchExtension+ - ", pending="+this.pending.size(); + ", pending="+getPendingQueueSize(); } public int getLockPriority() {