renamed matched to pending.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383969 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-07 19:18:28 +00:00
parent 3b0377cfaf
commit 5ff3487b2f
4 changed files with 27 additions and 27 deletions

View File

@ -97,12 +97,12 @@ public class DurableTopicSubscription extends PrefetchSubscription {
iter.remove(); iter.remove();
} }
for (Iterator iter = matched.iterator(); iter.hasNext();) { for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next(); MessageReference node = (MessageReference) iter.next();
// node.decrementTargetCount(); // node.decrementTargetCount();
iter.remove(); iter.remove();
} }
delivered=0; prefetchExtension=0;
} }
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
@ -156,8 +156,8 @@ public class DurableTopicSubscription extends PrefetchSubscription {
" consumer="+info.getConsumerId()+ " consumer="+info.getConsumerId()+
", destinations="+destinations.size()+ ", destinations="+destinations.size()+
", dispatched="+dispatched.size()+ ", dispatched="+dispatched.size()+
", delivered="+this.delivered+ ", delivered="+this.prefetchExtension+
", matched="+this.matched.size(); ", pending="+this.pending.size();
} }
public String getClientId() { public String getClientId() {

View File

@ -42,17 +42,17 @@ import org.apache.commons.logging.LogFactory;
abstract public class PrefetchSubscription extends AbstractSubscription{ abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class); static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected LinkedList matched=new LinkedList(); final protected LinkedList pending=new LinkedList();
final protected LinkedList dispatched=new LinkedList(); final protected LinkedList dispatched=new LinkedList();
protected int delivered=0; protected int prefetchExtension=0;
int preLoadLimit=1024*100; int preLoadLimit=1024*100;
int preLoadSize=0; int preLoadSize=0;
boolean dispatching=false; boolean dispatching=false;
long enqueueCounter; long enqueueCounter;
long dispatchCounter; long dispatchCounter;
long aknowledgedCounter; long dequeueCounter;
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
throws InvalidSelectorException{ throws InvalidSelectorException{
@ -64,15 +64,15 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(!isFull()&&!isSlaveBroker()){ if(!isFull()&&!isSlaveBroker()){
dispatch(node); dispatch(node);
}else{ }else{
synchronized(matched){ synchronized(pending){
matched.addLast(node); pending.addLast(node);
} }
} }
} }
public void processMessageDispatchNotification(MessageDispatchNotification mdn){ public void processMessageDispatchNotification(MessageDispatchNotification mdn){
synchronized(matched){ synchronized(pending){
for(Iterator i=matched.iterator();i.hasNext();){ for(Iterator i=pending.iterator();i.hasNext();){
MessageReference node=(MessageReference) i.next(); MessageReference node=(MessageReference) i.next();
if(node.getMessageId().equals(mdn.getMessageId())){ if(node.getMessageId().equals(mdn.getMessageId())){
i.remove(); i.remove();
@ -106,16 +106,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(inAckRange){ if(inAckRange){
// Don't remove the nodes until we are committed. // Don't remove the nodes until we are committed.
if(!context.isInTransaction()){ if(!context.isInTransaction()){
aknowledgedCounter++; dequeueCounter++;
iter.remove(); iter.remove();
}else{ }else{
// setup a Synchronization to remove nodes from the dispatched list. // setup a Synchronization to remove nodes from the dispatched list.
context.getTransaction().addSynchronization(new Synchronization(){ context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{ public void afterCommit() throws Exception{
synchronized(PrefetchSubscription.this){ synchronized(PrefetchSubscription.this){
aknowledgedCounter++; dequeueCounter++;
dispatched.remove(node); dispatched.remove(node);
delivered--; prefetchExtension--;
} }
} }
}); });
@ -124,9 +124,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
acknowledge(context,ack,node); acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){ if(ack.getLastMessageId().equals(messageId)){
if(context.isInTransaction()) if(context.isInTransaction())
delivered=Math.max(delivered,index+1); prefetchExtension=Math.max(prefetchExtension,index+1);
else else
delivered=Math.max(0,delivered-(index+1)); prefetchExtension=Math.max(0,prefetchExtension-(index+1));
if(wasFull&&!isFull()){ if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
} }
@ -144,7 +144,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
for(Iterator iter=dispatched.iterator();iter.hasNext();index++){ for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
final MessageReference node=(MessageReference) iter.next(); final MessageReference node=(MessageReference) iter.next();
if(ack.getLastMessageId().equals(node.getMessageId())){ if(ack.getLastMessageId().equals(node.getMessageId())){
delivered=Math.max(delivered,index+1); prefetchExtension=Math.max(prefetchExtension,index+1);
if(wasFull&&!isFull()){ if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
} }
@ -184,11 +184,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
iter.remove(); iter.remove();
aknowledgedCounter++; dequeueCounter++;
index++; index++;
acknowledge(context,ack,node); acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){ if(ack.getLastMessageId().equals(messageId)){
delivered=Math.max(0,delivered-(index+1)); prefetchExtension=Math.max(0,prefetchExtension-(index+1));
if(wasFull&&!isFull()){ if(wasFull&&!isFull()){
dispatchMatched(); dispatchMatched();
} }
@ -202,11 +202,11 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
protected boolean isFull(){ protected boolean isFull(){
return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit; return dispatched.size()-prefetchExtension>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
} }
synchronized public int getPendingQueueSize(){ synchronized public int getPendingQueueSize(){
return matched.size(); return pending.size();
} }
synchronized public int getDispatchedQueueSize(){ synchronized public int getDispatchedQueueSize(){
@ -214,7 +214,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
synchronized public long getDequeueCounter(){ synchronized public long getDequeueCounter(){
return aknowledgedCounter; return dequeueCounter;
} }
synchronized public long getDispatchedCounter() { synchronized public long getDispatchedCounter() {
@ -230,7 +230,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(!dispatching){ if(!dispatching){
dispatching=true; dispatching=true;
try{ try{
for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){ for(Iterator iter=pending.iterator();iter.hasNext()&&!isFull();){
MessageReference node=(MessageReference) iter.next(); MessageReference node=(MessageReference) iter.next();
iter.remove(); iter.remove();
dispatch(node); dispatch(node);

View File

@ -45,8 +45,8 @@ public class QueueBrowserSubscription extends PrefetchSubscription {
" consumer="+info.getConsumerId()+ " consumer="+info.getConsumerId()+
", destinations="+destinations.size()+ ", destinations="+destinations.size()+
", dispatched="+dispatched.size()+ ", dispatched="+dispatched.size()+
", delivered="+this.delivered+ ", delivered="+this.prefetchExtension+
", matched="+this.matched.size(); ", pending="+this.pending.size();
} }
public void browseDone() throws Exception { public void browseDone() throws Exception {

View File

@ -126,8 +126,8 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
" consumer="+info.getConsumerId()+ " consumer="+info.getConsumerId()+
", destinations="+destinations.size()+ ", destinations="+destinations.size()+
", dispatched="+dispatched.size()+ ", dispatched="+dispatched.size()+
", delivered="+this.delivered+ ", delivered="+this.prefetchExtension+
", matched="+this.matched.size(); ", pending="+this.pending.size();
} }
public int getLockPriority() { public int getLockPriority() {