diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index e39cdd5c08..fb9a20dc1d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1074,7 +1074,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } if (configuration.isDecreaseNetworkConsumerPriority()) { - byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; + byte priority = (byte) configuration.getConsumerPriorityBase(); if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { // The longer the path to the consumer, the less it's consumer priority. priority -= info.getBrokerPath().length + 1; @@ -1102,9 +1102,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } catch (IOException e) { LOG.error("Failed to create DemandSubscription ", e); } - if (result != null) { - result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); - } return result; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index a0b174c855..92eb9b8aef 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -18,6 +18,7 @@ package org.apache.activemq.network; import java.util.List; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerInfo; /** * Configuration for a NetworkBridge @@ -29,6 +30,7 @@ public class NetworkBridgeConfiguration { private boolean dynamicOnly; private boolean dispatchAsync = true; private boolean decreaseNetworkConsumerPriority; + private int consumerPriorityBase = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; private boolean duplex; private boolean bridgeTempDestinations = true; private int prefetchSize = 1000; @@ -316,4 +318,17 @@ public class NetworkBridgeConfiguration { public void setAlwaysSyncSend(boolean alwaysSyncSend) { this.alwaysSyncSend = alwaysSyncSend; } + + public int getConsumerPriorityBase() { + return consumerPriorityBase; + } + + /** + * @param consumerPriorityBase , default -5. Sets the starting priority + * for consumers. This base value will be decremented by the length of the + * broker path when decreaseNetworkConsumerPriority is set. + */ + public void setConsumerPriorityBase(int consumerPriorityBase) { + this.consumerPriorityBase = consumerPriorityBase; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java index ffff3f31df..2aa614d9c1 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java @@ -41,11 +41,15 @@ import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.Wait; @@ -61,6 +65,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { private static final String BROKER_2 = "tcp://localhost:61636"; private static final String BROKER_3 = "tcp://localhost:61646"; private final static String TOPIC_NAME = "broadcast"; + private static byte BASE_PRIORITY = -20; private BrokerService broker1; private BrokerService broker2; private BrokerService broker3; @@ -128,7 +133,8 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { networkConnector.setDynamicOnly(dynamicOnly); networkConnector.setNetworkTTL(ttl); networkConnector.setSuppressDuplicateTopicSubscriptions(suppressDuplicateTopicSubs); - + networkConnector.setConsumerPriorityBase(BASE_PRIORITY); + networkConnector.addStaticallyIncludedDestination(new ActiveMQTopic("BeStaticallyIncluded")); PolicyMap policyMap = new PolicyMap(); PolicyEntry policy = new PolicyEntry(); @@ -196,7 +202,31 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { // ensure subscription has percolated though the network Thread.sleep(2000); - + + // verify network consumer priority + final RegionBroker regionBroker = (RegionBroker)broker1.getRegionBroker(); + assertTrue("Found network destination with priority as expected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Map destinationMap = regionBroker.getTopicRegion().getDestinationMap(); + LOG.info("destinations: " + destinationMap.keySet()); + boolean found = false; + for (Destination destination : destinationMap.values()) { + List subscriptions = destination.getConsumers(); + LOG.info(destination + " subscriptions: " + subscriptions); + for (Subscription subscription : subscriptions) { + if (subscription.getConsumerInfo().isNetworkSubscription()) { + LOG.info("subscription: " + subscription + ", priority: " + subscription.getConsumerInfo().getPriority()); + assertTrue("priority is < our base: " + subscription.getConsumerInfo().getPriority(), + subscription.getConsumerInfo().getPriority() <= BASE_PRIORITY); + found = true; + } + } + } + return found; + } + })); + producerThread.start(); LOG.info("Started Producer"); producerThread.join();