diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 1ffcee9f40..cb6dd0af21 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -118,15 +118,19 @@ public class DurableTopicSubscription extends PrefetchSubscription { } } - if( !keepDurableSubsActive ) { - synchronized(pending) { - pending.reset(); - while(pending.hasNext()) { - MessageReference node = pending.next(); - node.decrementReferenceCount(); - pending.remove(); - } - } + if(!keepDurableSubsActive){ + synchronized(pending){ + try{ + pending.reset(); + while(pending.hasNext()){ + MessageReference node=pending.next(); + node.decrementReferenceCount(); + pending.remove(); + } + }finally{ + pending.release(); + } + } } prefetchExtension=0; } @@ -195,22 +199,24 @@ public class DurableTopicSubscription extends PrefetchSubscription { /** * Release any references that we are holding. */ - public void destroy() { - synchronized(pending) { - pending.reset(); - while(pending.hasNext()) { - MessageReference node = pending.next(); - node.decrementReferenceCount(); - } - pending.clear(); - } - - for (Iterator iter = dispatched.iterator(); iter.hasNext();) { - MessageReference node = (MessageReference) iter.next(); + public void destroy(){ + try{ + synchronized(pending){ + pending.reset(); + while(pending.hasNext()){ + MessageReference node=pending.next(); + node.decrementReferenceCount(); + } + } + }finally{ + pending.release(); + pending.clear(); + } + for(Iterator iter=dispatched.iterator();iter.hasNext();){ + MessageReference node=(MessageReference)iter.next(); node.decrementReferenceCount(); } dispatched.clear(); - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 63120a9a1e..6caab4906a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -123,6 +123,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } public void add(MessageReference node) throws Exception{ + try { boolean pendingEmpty = false; synchronized(pending){ pendingEmpty=pending.isEmpty(); @@ -139,21 +140,30 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ pending.addMessageLast(node); } } + }catch(Throwable e) { + e.printStackTrace(); + + } } - public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { + public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{ synchronized(pending){ - pending.reset(); - while(pending.hasNext()){ - MessageReference node=pending.next(); - if(node.getMessageId().equals(mdn.getMessageId())){ - pending.remove(); - createMessageDispatch(node,node.getMessage()); - dispatched.addLast(node); - return; + try{ + pending.reset(); + while(pending.hasNext()){ + MessageReference node=pending.next(); + if(node.getMessageId().equals(mdn.getMessageId())){ + pending.remove(); + createMessageDispatch(node,node.getMessage()); + dispatched.addLast(node); + return; + } } + }finally{ + pending.release(); } - throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()+") was not in the pending list: "+pending); + throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId() + +") was not in the pending list: "+pending); } } @@ -387,6 +397,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ dispatch(node); } }finally{ + pending.release(); dispatching=false; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b094f25475..7cc4a52f2d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -545,54 +545,58 @@ public class Queue implements Destination, Task { } } } - synchronized (messages) { - messages.reset(); - while(messages.hasNext()) { - try { - MessageReference r = messages.next(); - r.incrementReferenceCount(); - try { - Message m = r.getMessage(); - if (m != null) { - l.add(m); + synchronized(messages){ + try{ + messages.reset(); + while(messages.hasNext()){ + try{ + MessageReference r=messages.next(); + r.incrementReferenceCount(); + try{ + Message m=r.getMessage(); + if(m!=null){ + l.add(m); + } + }finally{ + r.decrementReferenceCount(); } - } - finally { - r.decrementReferenceCount(); + }catch(IOException e){ + log.error("caught an exception brwsing "+this,e); } } - catch (IOException e) { - log.error("caught an exception brwsing " + this,e); - } + }finally{ + messages.release(); } } return (Message[]) l.toArray(new Message[l.size()]); } - public Message getMessage(String messageId) { - synchronized (messages) { - messages.reset(); - while(messages.hasNext()) { - try { - MessageReference r = messages.next(); - if (messageId.equals(r.getMessageId().toString())) { - r.incrementReferenceCount(); - try { - Message m = r.getMessage(); - if (m != null) { - return m; + public Message getMessage(String messageId){ + synchronized(messages){ + try{ + messages.reset(); + while(messages.hasNext()){ + try{ + MessageReference r=messages.next(); + if(messageId.equals(r.getMessageId().toString())){ + r.incrementReferenceCount(); + try{ + Message m=r.getMessage(); + if(m!=null){ + return m; + } + }finally{ + r.decrementReferenceCount(); } + break; } - finally { - r.decrementReferenceCount(); - } - break; + }catch(IOException e){ + log.error("got an exception retrieving message "+messageId); } } - catch (IOException e) { - log.error("got an exception retrieving message " + messageId); - } + }finally{ + messages.release(); } } return null; @@ -868,13 +872,17 @@ public class Queue implements Destination, Task { int count=0; result=new ArrayList(toPageIn); synchronized(messages){ - messages.reset(); - while(messages.hasNext()&&count