git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378145 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-02-16 02:49:48 +00:00
parent 57406ceb78
commit 896324436a
2 changed files with 33 additions and 8 deletions

View File

@ -81,6 +81,7 @@ public class DemandForwardingBridge implements Bridge{
BrokerId localBrokerId;
BrokerId remoteBrokerId;
private Object brokerInfoMutex = new Object();
private static class DemandSubscription{
ConsumerInfo remoteInfo;
ConsumerInfo localInfo;
@ -91,11 +92,13 @@ public class DemandForwardingBridge implements Bridge{
localInfo=info.copy();
}
}
ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
private CountDownLatch startedLatch = new CountDownLatch(2);
private boolean decreaseNetowrkConsumerPriority;
public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
this.localBroker=localBroker;
@ -289,12 +292,16 @@ public class DemandForwardingBridge implements Bridge{
.getNextSequenceId()));
sub.localInfo.setDispatchAsync(dispatchAsync);
sub.localInfo.setPrefetchSize(prefetchSize);
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
// The longer the path to the consumer, the less it's consumer priority.
priority-=info.getBrokerPath().length+1;
if( decreaseNetowrkConsumerPriority ) {
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
// The longer the path to the consumer, the less it's consumer priority.
priority-=info.getBrokerPath().length+1;
}
sub.localInfo.setPriority(priority);
}
sub.localInfo.setPriority(priority);
subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
sub.localInfo.setBrokerPath(info.getBrokerPath());
@ -473,4 +480,12 @@ public class DemandForwardingBridge implements Bridge{
private void waitStarted() throws InterruptedException {
startedLatch.await();
}
public boolean isDecreaseNetowrkConsumerPriority() {
return decreaseNetowrkConsumerPriority;
}
public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority) {
this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
}
}

View File

@ -22,7 +22,6 @@ import java.net.URISyntaxException;
import java.util.Set;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@ -49,8 +48,8 @@ public class NetworkConnector implements Service, DiscoveryListener {
private ConcurrentHashMap bridges = new ConcurrentHashMap();
private Set durableDestinations;
boolean failover=true;
private boolean failover=true;
private boolean decreaseNetowrkConsumerPriority;
public NetworkConnector(){
@ -196,6 +195,7 @@ public class NetworkConnector implements Service, DiscoveryListener {
}
}
};
result.setDecreaseNetowrkConsumerPriority(isDecreaseNetowrkConsumerPriority());
result.setLocalBrokerName(brokerName);
return result;
}
@ -242,4 +242,14 @@ public class NetworkConnector implements Service, DiscoveryListener {
this.durableDestinations=durableDestinations;
}
public boolean isDecreaseNetowrkConsumerPriority() {
return decreaseNetowrkConsumerPriority;
}
public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority) {
this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
}
}