https://issues.apache.org/jira/browse/AMQ-3558 - Allow the base network consumer priority to be configured on a networkConnector. Addition of consumerPriorityBase atribute to networkConnector configuration, additional test. This allows the base to be configured such that one network connector can have priority over another

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1187461 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-10-21 16:46:43 +00:00
parent 85edfb3425
commit 5b796cd8d6
3 changed files with 48 additions and 6 deletions

View File

@ -1074,7 +1074,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
if (configuration.isDecreaseNetworkConsumerPriority()) { if (configuration.isDecreaseNetworkConsumerPriority()) {
byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; byte priority = (byte) configuration.getConsumerPriorityBase();
if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
// The longer the path to the consumer, the less it's consumer priority. // The longer the path to the consumer, the less it's consumer priority.
priority -= info.getBrokerPath().length + 1; priority -= info.getBrokerPath().length + 1;
@ -1102,9 +1102,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to create DemandSubscription ", e); LOG.error("Failed to create DemandSubscription ", e);
} }
if (result != null) {
result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
}
return result; return result;
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.network;
import java.util.List; import java.util.List;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
/** /**
* Configuration for a NetworkBridge * Configuration for a NetworkBridge
@ -29,6 +30,7 @@ public class NetworkBridgeConfiguration {
private boolean dynamicOnly; private boolean dynamicOnly;
private boolean dispatchAsync = true; private boolean dispatchAsync = true;
private boolean decreaseNetworkConsumerPriority; private boolean decreaseNetworkConsumerPriority;
private int consumerPriorityBase = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
private boolean duplex; private boolean duplex;
private boolean bridgeTempDestinations = true; private boolean bridgeTempDestinations = true;
private int prefetchSize = 1000; private int prefetchSize = 1000;
@ -316,4 +318,17 @@ public class NetworkBridgeConfiguration {
public void setAlwaysSyncSend(boolean alwaysSyncSend) { public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend; this.alwaysSyncSend = alwaysSyncSend;
} }
public int getConsumerPriorityBase() {
return consumerPriorityBase;
}
/**
* @param consumerPriorityBase , default -5. Sets the starting priority
* for consumers. This base value will be decremented by the length of the
* broker path when decreaseNetworkConsumerPriority is set.
*/
public void setConsumerPriorityBase(int consumerPriorityBase) {
this.consumerPriorityBase = consumerPriorityBase;
}
} }

View File

@ -41,11 +41,15 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy; import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
@ -61,6 +65,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
private static final String BROKER_2 = "tcp://localhost:61636"; private static final String BROKER_2 = "tcp://localhost:61636";
private static final String BROKER_3 = "tcp://localhost:61646"; private static final String BROKER_3 = "tcp://localhost:61646";
private final static String TOPIC_NAME = "broadcast"; private final static String TOPIC_NAME = "broadcast";
private static byte BASE_PRIORITY = -20;
private BrokerService broker1; private BrokerService broker1;
private BrokerService broker2; private BrokerService broker2;
private BrokerService broker3; private BrokerService broker3;
@ -128,7 +133,8 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
networkConnector.setDynamicOnly(dynamicOnly); networkConnector.setDynamicOnly(dynamicOnly);
networkConnector.setNetworkTTL(ttl); networkConnector.setNetworkTTL(ttl);
networkConnector.setSuppressDuplicateTopicSubscriptions(suppressDuplicateTopicSubs); networkConnector.setSuppressDuplicateTopicSubscriptions(suppressDuplicateTopicSubs);
networkConnector.setConsumerPriorityBase(BASE_PRIORITY);
networkConnector.addStaticallyIncludedDestination(new ActiveMQTopic("BeStaticallyIncluded"));
PolicyMap policyMap = new PolicyMap(); PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
@ -197,6 +203,30 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
// ensure subscription has percolated though the network // ensure subscription has percolated though the network
Thread.sleep(2000); Thread.sleep(2000);
// verify network consumer priority
final RegionBroker regionBroker = (RegionBroker)broker1.getRegionBroker();
assertTrue("Found network destination with priority as expected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getTopicRegion().getDestinationMap();
LOG.info("destinations: " + destinationMap.keySet());
boolean found = false;
for (Destination destination : destinationMap.values()) {
List<Subscription> subscriptions = destination.getConsumers();
LOG.info(destination + " subscriptions: " + subscriptions);
for (Subscription subscription : subscriptions) {
if (subscription.getConsumerInfo().isNetworkSubscription()) {
LOG.info("subscription: " + subscription + ", priority: " + subscription.getConsumerInfo().getPriority());
assertTrue("priority is < our base: " + subscription.getConsumerInfo().getPriority(),
subscription.getConsumerInfo().getPriority() <= BASE_PRIORITY);
found = true;
}
}
}
return found;
}
}));
producerThread.start(); producerThread.start();
LOG.info("Started Producer"); LOG.info("Started Producer");
producerThread.join(); producerThread.join();