From 5ff3487b2fc32ca5becdfbf7f5c1c3a5f26a8680 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 7 Mar 2006 19:18:28 +0000 Subject: [PATCH] renamed matched to pending. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383969 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/DurableTopicSubscription.java | 8 ++-- .../broker/region/PrefetchSubscription.java | 38 +++++++++---------- .../region/QueueBrowserSubscription.java | 4 +- .../broker/region/QueueSubscription.java | 4 +- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index e6525d08d5..8392181687 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -97,12 +97,12 @@ public class DurableTopicSubscription extends PrefetchSubscription { iter.remove(); } - for (Iterator iter = matched.iterator(); iter.hasNext();) { + for (Iterator iter = pending.iterator(); iter.hasNext();) { MessageReference node = (MessageReference) iter.next(); // node.decrementTargetCount(); iter.remove(); } - delivered=0; + prefetchExtension=0; } protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { @@ -156,8 +156,8 @@ public class DurableTopicSubscription extends PrefetchSubscription { " consumer="+info.getConsumerId()+ ", destinations="+destinations.size()+ ", dispatched="+dispatched.size()+ - ", delivered="+this.delivered+ - ", matched="+this.matched.size(); + ", delivered="+this.prefetchExtension+ + ", pending="+this.pending.size(); } public String getClientId() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index b9fd225ec6..84cb74c582 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -42,17 +42,17 @@ import org.apache.commons.logging.LogFactory; abstract public class PrefetchSubscription extends AbstractSubscription{ static private final Log log=LogFactory.getLog(PrefetchSubscription.class); - final protected LinkedList matched=new LinkedList(); + final protected LinkedList pending=new LinkedList(); final protected LinkedList dispatched=new LinkedList(); - protected int delivered=0; + protected int prefetchExtension=0; int preLoadLimit=1024*100; int preLoadSize=0; boolean dispatching=false; long enqueueCounter; long dispatchCounter; - long aknowledgedCounter; + long dequeueCounter; public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) throws InvalidSelectorException{ @@ -64,15 +64,15 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ if(!isFull()&&!isSlaveBroker()){ dispatch(node); }else{ - synchronized(matched){ - matched.addLast(node); + synchronized(pending){ + pending.addLast(node); } } } public void processMessageDispatchNotification(MessageDispatchNotification mdn){ - synchronized(matched){ - for(Iterator i=matched.iterator();i.hasNext();){ + synchronized(pending){ + for(Iterator i=pending.iterator();i.hasNext();){ MessageReference node=(MessageReference) i.next(); if(node.getMessageId().equals(mdn.getMessageId())){ i.remove(); @@ -106,16 +106,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ if(inAckRange){ // Don't remove the nodes until we are committed. if(!context.isInTransaction()){ - aknowledgedCounter++; + dequeueCounter++; iter.remove(); }else{ // setup a Synchronization to remove nodes from the dispatched list. context.getTransaction().addSynchronization(new Synchronization(){ public void afterCommit() throws Exception{ synchronized(PrefetchSubscription.this){ - aknowledgedCounter++; + dequeueCounter++; dispatched.remove(node); - delivered--; + prefetchExtension--; } } }); @@ -124,9 +124,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ acknowledge(context,ack,node); if(ack.getLastMessageId().equals(messageId)){ if(context.isInTransaction()) - delivered=Math.max(delivered,index+1); + prefetchExtension=Math.max(prefetchExtension,index+1); else - delivered=Math.max(0,delivered-(index+1)); + prefetchExtension=Math.max(0,prefetchExtension-(index+1)); if(wasFull&&!isFull()){ dispatchMatched(); } @@ -144,7 +144,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ for(Iterator iter=dispatched.iterator();iter.hasNext();index++){ final MessageReference node=(MessageReference) iter.next(); if(ack.getLastMessageId().equals(node.getMessageId())){ - delivered=Math.max(delivered,index+1); + prefetchExtension=Math.max(prefetchExtension,index+1); if(wasFull&&!isFull()){ dispatchMatched(); } @@ -184,11 +184,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ node.decrementReferenceCount(); } iter.remove(); - aknowledgedCounter++; + dequeueCounter++; index++; acknowledge(context,ack,node); if(ack.getLastMessageId().equals(messageId)){ - delivered=Math.max(0,delivered-(index+1)); + prefetchExtension=Math.max(0,prefetchExtension-(index+1)); if(wasFull&&!isFull()){ dispatchMatched(); } @@ -202,11 +202,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } protected boolean isFull(){ - return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit; + return dispatched.size()-prefetchExtension>=info.getPrefetchSize()||preLoadSize>preLoadLimit; } synchronized public int getPendingQueueSize(){ - return matched.size(); + return pending.size(); } synchronized public int getDispatchedQueueSize(){ @@ -214,7 +214,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } synchronized public long getDequeueCounter(){ - return aknowledgedCounter; + return dequeueCounter; } synchronized public long getDispatchedCounter() { @@ -230,7 +230,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ if(!dispatching){ dispatching=true; try{ - for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){ + for(Iterator iter=pending.iterator();iter.hasNext()&&!isFull();){ MessageReference node=(MessageReference) iter.next(); iter.remove(); dispatch(node); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index 52fcd06822..69409e9b37 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -45,8 +45,8 @@ public class QueueBrowserSubscription extends PrefetchSubscription { " consumer="+info.getConsumerId()+ ", destinations="+destinations.size()+ ", dispatched="+dispatched.size()+ - ", delivered="+this.delivered+ - ", matched="+this.matched.size(); + ", delivered="+this.prefetchExtension+ + ", pending="+this.pending.size(); } public void browseDone() throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 02e097cad4..00e2992417 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -126,8 +126,8 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner " consumer="+info.getConsumerId()+ ", destinations="+destinations.size()+ ", dispatched="+dispatched.size()+ - ", delivered="+this.delivered+ - ", matched="+this.matched.size(); + ", delivered="+this.prefetchExtension+ + ", pending="+this.pending.size(); } public int getLockPriority() {