removed the preLoadLimit logic since it was causing the RoundRobingDispatchPolicyTests to fail intermitently. The preLoadLimit was an additional prefetch limit, and when it kicked in,

it would cause the round robin distribution to stop sending round robin.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@387562 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-21 16:08:40 +00:00
parent 1b60ebbea5
commit 4757b273d1
1 changed files with 4 additions and 16 deletions

View File

@ -46,8 +46,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
final protected LinkedList dispatched=new LinkedList(); final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0; protected int prefetchExtension=0;
int preLoadLimit=1024*100;
int preLoadSize=0;
boolean dispatching=false; boolean dispatching=false;
long enqueueCounter; long enqueueCounter;
@ -65,6 +63,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
dispatch(node); dispatch(node);
}else{ }else{
synchronized(pending){ synchronized(pending){
if( pending.isEmpty() )
log.info("Prefetch limit.");
pending.addLast(node); pending.addLast(node);
} }
} }
@ -79,7 +79,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
try{ try{
MessageDispatch md=createMessageDispatch(node,node.getMessage()); MessageDispatch md=createMessageDispatch(node,node.getMessage());
dispatched.addLast(node); dispatched.addLast(node);
incrementPreloadSize(node.getSize());
node.decrementReferenceCount(); node.decrementReferenceCount();
}catch(Exception e){ }catch(Exception e){
log.error("Problem processing MessageDispatchNotification: "+mdn,e); log.error("Problem processing MessageDispatchNotification: "+mdn,e);
@ -101,6 +100,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
final MessageReference node=(MessageReference) iter.next(); final MessageReference node=(MessageReference) iter.next();
MessageId messageId=node.getMessageId(); MessageId messageId=node.getMessageId();
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
System.out.println("in range: "+messageId);
inAckRange=true; inAckRange=true;
} }
if(inAckRange){ if(inAckRange){
@ -202,7 +202,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
protected boolean isFull(){ protected boolean isFull(){
return dispatched.size()-prefetchExtension>=info.getPrefetchSize()||preLoadSize>preLoadLimit; return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
} }
synchronized public int getPendingQueueSize(){ synchronized public int getPendingQueueSize(){
@ -252,7 +252,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
dispatchCounter++; dispatchCounter++;
MessageDispatch md=createMessageDispatch(node,message); MessageDispatch md=createMessageDispatch(node,message);
dispatched.addLast(node); dispatched.addLast(node);
incrementPreloadSize(node.getMessage().getSize());
if(info.isDispatchAsync()){ if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){ md.setConsumer(new Runnable(){
public void run(){ public void run(){
@ -275,7 +274,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized private void onDispatch(final MessageReference node,final Message message){ synchronized private void onDispatch(final MessageReference node,final Message message){
boolean wasFull=isFull(); boolean wasFull=isFull();
decrementPreloadSize(message.getSize());
node.decrementReferenceCount(); node.decrementReferenceCount();
if(node.getRegionDestination()!=null){ if(node.getRegionDestination()!=null){
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
@ -290,16 +288,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
private int incrementPreloadSize(int size){
preLoadSize+=size;
return preLoadSize;
}
private int decrementPreloadSize(int size){
preLoadSize-=size;
return preLoadSize;
}
/** /**
* @param node * @param node
* @param message * @param message