diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java index 6e390b1633..bce72595a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java @@ -81,6 +81,7 @@ public class DemandForwardingBridge implements Bridge{ BrokerId localBrokerId; BrokerId remoteBrokerId; private Object brokerInfoMutex = new Object(); + private static class DemandSubscription{ ConsumerInfo remoteInfo; ConsumerInfo localInfo; @@ -91,11 +92,13 @@ public class DemandForwardingBridge implements Bridge{ localInfo=info.copy(); } } + ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap(); ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap(); protected final BrokerId localBrokerPath[]=new BrokerId[] { null }; protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null }; private CountDownLatch startedLatch = new CountDownLatch(2); + private boolean decreaseNetowrkConsumerPriority; public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){ this.localBroker=localBroker; @@ -289,12 +292,16 @@ public class DemandForwardingBridge implements Bridge{ .getNextSequenceId())); sub.localInfo.setDispatchAsync(dispatchAsync); sub.localInfo.setPrefetchSize(prefetchSize); - byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY; - if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){ - // The longer the path to the consumer, the less it's consumer priority. - priority-=info.getBrokerPath().length+1; + + if( decreaseNetowrkConsumerPriority ) { + byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY; + if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){ + // The longer the path to the consumer, the less it's consumer priority. + priority-=info.getBrokerPath().length+1; + } + sub.localInfo.setPriority(priority); } - sub.localInfo.setPriority(priority); + subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub); subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub); sub.localInfo.setBrokerPath(info.getBrokerPath()); @@ -473,4 +480,12 @@ public class DemandForwardingBridge implements Bridge{ private void waitStarted() throws InterruptedException { startedLatch.await(); } + + public boolean isDecreaseNetowrkConsumerPriority() { + return decreaseNetowrkConsumerPriority; + } + + public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority) { + this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index ae659632fa..41aa8a1110 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -22,7 +22,6 @@ import java.net.URISyntaxException; import java.util.Set; import org.apache.activemq.Service; -import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; @@ -49,8 +48,8 @@ public class NetworkConnector implements Service, DiscoveryListener { private ConcurrentHashMap bridges = new ConcurrentHashMap(); private Set durableDestinations; - boolean failover=true; - + private boolean failover=true; + private boolean decreaseNetowrkConsumerPriority; public NetworkConnector(){ @@ -196,6 +195,7 @@ public class NetworkConnector implements Service, DiscoveryListener { } } }; + result.setDecreaseNetowrkConsumerPriority(isDecreaseNetowrkConsumerPriority()); result.setLocalBrokerName(brokerName); return result; } @@ -242,4 +242,14 @@ public class NetworkConnector implements Service, DiscoveryListener { this.durableDestinations=durableDestinations; } + + public boolean isDecreaseNetowrkConsumerPriority() { + return decreaseNetowrkConsumerPriority; + } + + + public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority) { + this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority; + } + }