disable the use of range acks with network connectors since that could cause the broker to block waiting for messages to be consumed (in the case of big messages being sent).

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386132 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-15 19:04:32 +00:00
parent 6994361f80
commit 6a89f08c4f
2 changed files with 31 additions and 18 deletions

View File

@ -375,11 +375,18 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
serviceLocalException(er.getException());
}
}
int dispatched = sub.incrementDispatched();
if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
sub.setDispatched(0);
}
// Ack on every message since we don't know if the broker is blocked due to memory
// usage and is waiting for an Ack to un-block him.
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
// Acking a range is more efficient, but also more prone to locking up a server
// Perhaps doing something like the following should be policy based.
// int dispatched = sub.incrementDispatched();
// if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
// localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
// sub.setDispatched(0);
// }
}
}else if(command.isBrokerInfo()){
serviceLocalBrokerInfo(command);

View File

@ -218,19 +218,25 @@ public class ForwardingBridge implements Bridge {
remoteBroker.oneway( message );
if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
queueDispatched++;
if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) {
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched));
queueDispatched=0;
}
} else {
topicDispatched++;
if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) {
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched));
topicDispatched=0;
}
}
// Ack on every message since we don't know if the broker is blocked due to memory
// usage and is waiting for an Ack to un-block him.
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
// Acking a range is more efficient, but also more prone to locking up a server
// Perhaps doing something like the following should be policy based.
// if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
// queueDispatched++;
// if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) {
// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched));
// queueDispatched=0;
// }
// } else {
// topicDispatched++;
// if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) {
// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched));
// topicDispatched=0;
// }
// }
} else if(command.isBrokerInfo() ) {
synchronized( this ) {
localBrokerId = ((BrokerInfo)command).getBrokerId();