Gaurd access to the pending list better.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@397985 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-28 19:13:54 +00:00
parent f5f1366e4b
commit d65ba8034b
4 changed files with 35 additions and 41 deletions

View File

@ -63,10 +63,8 @@ public class DurableTopicSubscription extends PrefetchSubscription {
Topic topic = (Topic) destination; Topic topic = (Topic) destination;
topic.activate(context, this); topic.activate(context, this);
} }
if( !isFull() ) {
dispatchMatched(); dispatchMatched();
} }
}
synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception { synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
if( !active ) { if( !active ) {
@ -79,11 +77,9 @@ public class DurableTopicSubscription extends PrefetchSubscription {
topic.activate(context, this); topic.activate(context, this);
} }
} }
if( !isFull() ) {
dispatchMatched(); dispatchMatched();
} }
} }
}
synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
active=false; active=false;
@ -104,7 +100,9 @@ public class DurableTopicSubscription extends PrefetchSubscription {
redeliveredMessages.put(node.getMessageId(), new Integer(1)); redeliveredMessages.put(node.getMessageId(), new Integer(1));
} }
if( keepDurableSubsActive ) { if( keepDurableSubsActive ) {
synchronized(pending) {
pending.addFirst(node); pending.addFirst(node);
}
} else { } else {
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
@ -112,12 +110,14 @@ public class DurableTopicSubscription extends PrefetchSubscription {
} }
if( !keepDurableSubsActive ) { if( !keepDurableSubsActive ) {
synchronized(pending) {
for (Iterator iter = pending.iterator(); iter.hasNext();) { for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next(); MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount(); node.decrementReferenceCount();
iter.remove(); iter.remove();
} }
} }
}
prefetchExtension=0; prefetchExtension=0;
} }
@ -171,7 +171,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
", destinations="+destinations.size()+ ", destinations="+destinations.size()+
", dispatched="+dispatched.size()+ ", dispatched="+dispatched.size()+
", delivered="+this.prefetchExtension+ ", delivered="+this.prefetchExtension+
", pending="+this.pending.size(); ", pending="+getPendingQueueSize();
} }
public String getClientId() { public String getClientId() {
@ -187,11 +187,13 @@ public class DurableTopicSubscription extends PrefetchSubscription {
*/ */
synchronized public void destroy() { synchronized public void destroy() {
synchronized(pending) {
for (Iterator iter = pending.iterator(); iter.hasNext();) { for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next(); MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
pending.clear(); pending.clear();
}
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next(); MessageReference node = (MessageReference) iter.next();

View File

@ -94,7 +94,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case. // Handle the standard acknowledgment case.
boolean wasFull=isFull();
if(ack.isStandardAck()){ if(ack.isStandardAck()){
// Acknowledge all dispatched messages up till the message id of the acknowledgment. // Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0; int index=0;
@ -129,9 +128,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
prefetchExtension=Math.max(prefetchExtension,index+1); prefetchExtension=Math.max(prefetchExtension,index+1);
else else
prefetchExtension=Math.max(0,prefetchExtension-(index+1)); prefetchExtension=Math.max(0,prefetchExtension-(index+1));
if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
}
return; return;
}else{ }else{
// System.out.println("no match: "+ack.getLastMessageId()+","+messageId); // System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
@ -147,9 +144,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
final MessageReference node=(MessageReference) iter.next(); final MessageReference node=(MessageReference) iter.next();
if(ack.getLastMessageId().equals(node.getMessageId())){ if(ack.getLastMessageId().equals(node.getMessageId())){
prefetchExtension=Math.max(prefetchExtension,index+1); prefetchExtension=Math.max(prefetchExtension,index+1);
if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
}
return; return;
} }
} }
@ -176,9 +171,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
acknowledge(context,ack,node); acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){ if(ack.getLastMessageId().equals(messageId)){
prefetchExtension=Math.max(0,prefetchExtension-(index+1)); prefetchExtension=Math.max(0,prefetchExtension-(index+1));
if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
}
return; return;
} }
} }
@ -226,9 +219,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9); return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
} }
synchronized public int getPendingQueueSize(){ public int getPendingQueueSize(){
synchronized(pending) {
return pending.size(); return pending.size();
} }
}
synchronized public int getDispatchedQueueSize(){ synchronized public int getDispatchedQueueSize(){
return dispatched.size(); return dispatched.size();
@ -312,11 +307,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
synchronized protected void onDispatch(final MessageReference node,final Message message){ synchronized protected void onDispatch(final MessageReference node,final Message message){
boolean wasFull=isFull();
if(node.getRegionDestination()!=null){ if(node.getRegionDestination()!=null){
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message); context.getConnection().getStatistics().onMessageDequeue(message);
if(wasFull&&!isFull()){
try{ try{
dispatchMatched(); dispatchMatched();
}catch(IOException e){ }catch(IOException e){
@ -324,7 +317,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
} }
}
/** /**
* inform the MessageConsumer on the client to change it's prefetch * inform the MessageConsumer on the client to change it's prefetch

View File

@ -47,7 +47,7 @@ public class QueueBrowserSubscription extends QueueSubscription {
", destinations="+destinations.size()+ ", destinations="+destinations.size()+
", dispatched="+dispatched.size()+ ", dispatched="+dispatched.size()+
", delivered="+this.prefetchExtension+ ", delivered="+this.prefetchExtension+
", pending="+this.pending.size(); ", pending="+getPendingQueueSize();
} }
public void browseDone() throws Exception { public void browseDone() throws Exception {

View File

@ -125,7 +125,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
", destinations="+destinations.size()+ ", destinations="+destinations.size()+
", dispatched="+dispatched.size()+ ", dispatched="+dispatched.size()+
", delivered="+this.prefetchExtension+ ", delivered="+this.prefetchExtension+
", pending="+this.pending.size(); ", pending="+getPendingQueueSize();
} }
public int getLockPriority() { public int getLockPriority() {