mirror of https://github.com/apache/activemq.git
resolve https://issues.apache.org/activemq/browse/AMQ-2298 - respect priority when suppressing duplicate network subscriptions
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@787855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
de4b78397e
commit
9670af1e0d
|
@ -559,10 +559,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
// in a cyclic network there can be multiple bridges per broker that can propagate
|
// in a cyclic network there can be multiple bridges per broker that can propagate
|
||||||
// a network subscription so there is a need to synchronise on a shared entity
|
// a network subscription so there is a need to synchronise on a shared entity
|
||||||
synchronized(brokerService.getVmConnectorURI()) {
|
synchronized(brokerService.getVmConnectorURI()) {
|
||||||
if (isDuplicateNetworkSubscription(info)) {
|
|
||||||
// trace in method
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (addConsumerInfo(info)) {
|
if (addConsumerInfo(info)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
|
LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
|
||||||
|
@ -637,8 +633,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
|
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
|
||||||
}
|
}
|
||||||
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
|
|
||||||
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
|
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
|
||||||
|
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -951,50 +947,88 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
|
protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
|
||||||
boolean result = false;
|
boolean consumerAdded = false;
|
||||||
ConsumerInfo info = consumerInfo.copy();
|
ConsumerInfo info = consumerInfo.copy();
|
||||||
addRemoteBrokerToBrokerPath(info);
|
addRemoteBrokerToBrokerPath(info);
|
||||||
DemandSubscription sub = createDemandSubscription(info);
|
DemandSubscription sub = createDemandSubscription(info);
|
||||||
if (sub != null) {
|
if (sub != null) {
|
||||||
addSubscription(sub);
|
if (duplicateSuppressionIsRequired(sub) ) {
|
||||||
result = true;
|
undoMapRegistration(sub);
|
||||||
|
} else {
|
||||||
|
addSubscription(sub);
|
||||||
|
consumerAdded = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return consumerAdded;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void undoMapRegistration(DemandSubscription sub) {
|
||||||
|
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
|
||||||
|
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* check our existing subs networkConsumerIds against the list of network ids in this subscription
|
* check our existing subs networkConsumerIds against the list of network ids in this subscription
|
||||||
* A match means a duplicate which we suppress for topics and maybe for queues
|
* A match means a duplicate which we suppress for topics and maybe for queues
|
||||||
*/
|
*/
|
||||||
private boolean isDuplicateNetworkSubscription(ConsumerInfo consumerInfo) {
|
private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
|
||||||
boolean isDuplicate = false;
|
final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
|
||||||
|
boolean suppress = false;
|
||||||
|
|
||||||
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
|
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
|
||||||
return isDuplicate;
|
return suppress;
|
||||||
}
|
|
||||||
|
|
||||||
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
|
|
||||||
if (candidateConsumers.isEmpty()) {
|
|
||||||
candidateConsumers.add(consumerInfo.getConsumerId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
|
||||||
Collection<Subscription> currentSubs =
|
Collection<Subscription> currentSubs =
|
||||||
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
|
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
|
||||||
for (Subscription sub : currentSubs) {
|
for (Subscription sub : currentSubs) {
|
||||||
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
|
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
|
||||||
if (!networkConsumers.isEmpty()) {
|
if (!networkConsumers.isEmpty()) {
|
||||||
if (matchFound(candidateConsumers, networkConsumers)) {
|
if (matchFound(candidateConsumers, networkConsumers)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
suppress = hasLowerPriority(sub, candidate.getLocalInfo());
|
||||||
LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
|
|
||||||
+ ", sub: " + consumerInfo + " is duplicated by network subscription: "
|
|
||||||
+ sub.getConsumerInfo() + ", networkComsumerIds: " + networkConsumers);
|
|
||||||
}
|
|
||||||
isDuplicate = true;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return isDuplicate;
|
return suppress;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
|
||||||
|
boolean suppress = false;
|
||||||
|
|
||||||
|
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
|
||||||
|
+ ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
|
||||||
|
+ existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
|
||||||
|
}
|
||||||
|
suppress = true;
|
||||||
|
} else {
|
||||||
|
// remove the existing lower priority duplicate and allow this candidate
|
||||||
|
try {
|
||||||
|
removeDuplicateSubscription(existingSub);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
|
||||||
|
+ " with sub from " + remoteBrokerName
|
||||||
|
+ ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
|
||||||
|
+ candidateInfo.getNetworkConsumerIds());
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: "+ existingSub, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return suppress;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
|
||||||
|
for (NetworkConnector connector: brokerService.getNetworkConnectors()) {
|
||||||
|
if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
|
private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
|
||||||
|
@ -1034,15 +1068,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
|
|
||||||
if (configuration.isDecreaseNetworkConsumerPriority()) {
|
if (configuration.isDecreaseNetworkConsumerPriority()) {
|
||||||
byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
|
byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
|
||||||
if (priority > Byte.MIN_VALUE && info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
|
if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
|
||||||
// The longer the path to the consumer, the less it's consumer
|
// The longer the path to the consumer, the less it's consumer priority.
|
||||||
// priority.
|
priority -= info.getBrokerPath().length + 1;
|
||||||
priority -= info.getBrokerPath().length + 1;
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
result.getLocalInfo().setPriority(priority);
|
result.getLocalInfo().setPriority(priority);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
configureDemandSubscription(info, result);
|
configureDemandSubscription(info, result);
|
||||||
return result;
|
return result;
|
||||||
|
@ -1079,6 +1112,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
|
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void removeDemandSubscription(ConsumerId id) throws IOException {
|
protected void removeDemandSubscription(ConsumerId id) throws IOException {
|
||||||
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
|
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -1091,6 +1125,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
|
||||||
|
boolean removeDone = false;
|
||||||
|
DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
|
||||||
|
if (sub != null) {
|
||||||
|
try {
|
||||||
|
removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
|
||||||
|
removeDone = true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return removeDone;
|
||||||
|
}
|
||||||
|
|
||||||
protected void waitStarted() throws InterruptedException {
|
protected void waitStarted() throws InterruptedException {
|
||||||
startedLatch.await();
|
startedLatch.await();
|
||||||
|
|
|
@ -16,13 +16,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.network;
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import javax.management.MBeanServer;
|
import javax.management.MBeanServer;
|
||||||
|
@ -34,6 +37,7 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.jmx.NetworkBridgeView;
|
import org.apache.activemq.broker.jmx.NetworkBridgeView;
|
||||||
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
|
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ConsumerId;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportFactory;
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
import org.apache.activemq.util.JMXSupport;
|
import org.apache.activemq.util.JMXSupport;
|
||||||
|
@ -67,7 +71,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
||||||
private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
|
private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
|
||||||
private BrokerService brokerService;
|
private BrokerService brokerService;
|
||||||
private ObjectName objectName;
|
private ObjectName objectName;
|
||||||
|
private ConcurrentLinkedQueue<DemandForwardingBridgeSupport> configuredBridges = new ConcurrentLinkedQueue<DemandForwardingBridgeSupport>();
|
||||||
|
|
||||||
public NetworkConnector() {
|
public NetworkConnector() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,6 +191,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
||||||
dest = (ActiveMQDestination[])topics.toArray(dest);
|
dest = (ActiveMQDestination[])topics.toArray(dest);
|
||||||
result.setDurableDestinations(dest);
|
result.setDurableDestinations(dest);
|
||||||
}
|
}
|
||||||
|
configuredBridges.add(result);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,4 +277,16 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
||||||
+ JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
|
+ JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ask all the bridges as we can't know to which this consumer is tied
|
||||||
|
public boolean removeDemandSubscription(ConsumerId consumerId) {
|
||||||
|
boolean removeSucceeded = false;
|
||||||
|
for (DemandForwardingBridgeSupport bridge: configuredBridges) {
|
||||||
|
if (bridge.removeDemandSubscriptionByLocalId(consumerId)) {
|
||||||
|
removeSucceeded = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return removeSucceeded;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,15 +22,24 @@ import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
|
||||||
import org.apache.activemq.JmsMultipleBrokersTestSupport;
|
import org.apache.activemq.JmsMultipleBrokersTestSupport;
|
||||||
|
import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
|
||||||
|
import org.apache.activemq.broker.Broker;
|
||||||
|
import org.apache.activemq.broker.BrokerFilter;
|
||||||
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.region.Queue;
|
import org.apache.activemq.broker.region.Queue;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
|
import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.util.MessageIdList;
|
import org.apache.activemq.util.MessageIdList;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -322,7 +331,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
// Send messages
|
// Send messages
|
||||||
sendMessages("BrokerB", dest, messageCount);
|
sendMessages("BrokerB", dest, messageCount);
|
||||||
|
|
||||||
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
|
assertTrue("messaged received within time limit", messagesReceived.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// Get message count
|
// Get message count
|
||||||
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
@ -424,7 +433,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
sendMessages("BrokerB", dest, messageCount);
|
sendMessages("BrokerB", dest, messageCount);
|
||||||
|
|
||||||
// Let's try to wait for any messages.
|
// Let's try to wait for any messages.
|
||||||
assertTrue(messagesReceived.await(60, TimeUnit.SECONDS));
|
assertTrue("messages are received within limit", messagesReceived.await(60, TimeUnit.SECONDS));
|
||||||
assertEquals(messageCount, msgs.getMessageCount());
|
assertEquals(messageCount, msgs.getMessageCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,8 +460,68 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
verifyConsumerCount(broker, 1, dest);
|
verifyConsumerCount(broker, 1, dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public void testNoDuplicateQueueSubsHasLowestPriority() throws Exception {
|
||||||
|
boolean suppressQueueDuplicateSubscriptions = true;
|
||||||
|
boolean decreaseNetworkConsumerPriority = true;
|
||||||
|
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
final Destination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// delay the advisory messages so that one can percolate fully (cyclicly) before the other
|
||||||
|
BrokerItem brokerB = brokers.get("BrokerA");
|
||||||
|
brokerB.broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
|
||||||
|
|
||||||
|
public Broker installPlugin(Broker broker) throws Exception {
|
||||||
|
return new BrokerFilter(broker) {
|
||||||
|
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
@Override
|
||||||
|
public void preProcessDispatch(
|
||||||
|
MessageDispatch messageDispatch) {
|
||||||
|
if (messageDispatch.getDestination().getPhysicalName().contains("ActiveMQ.Advisory.Consumer")) {
|
||||||
|
// lets delay the first advisory
|
||||||
|
if (count.getAndIncrement() == 0) {
|
||||||
|
LOG.info("Sleeping on first advisory: " + messageDispatch);
|
||||||
|
try {
|
||||||
|
Thread.sleep(2000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
super.postProcessDispatch(messageDispatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}}
|
||||||
|
});
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
String brokerName = "BrokerA";
|
||||||
|
createConsumer(brokerName, dest);
|
||||||
|
|
||||||
|
// wait for advisories
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
// verify there is one consumer on each broker, no cycles
|
||||||
|
Collection<BrokerItem> brokerList = brokers.values();
|
||||||
|
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
|
||||||
|
BrokerService broker = i.next().broker;
|
||||||
|
verifyConsumerCount(broker, 1, dest);
|
||||||
|
if (!brokerName.equals(broker.getBrokerName())) {
|
||||||
|
verifyConsumePriority(broker, ConsumerInfo.NETWORK_CONSUMER_PRIORITY, dest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testDuplicateQueueSubs() throws Exception {
|
public void testDuplicateQueueSubs() throws Exception {
|
||||||
|
|
||||||
bridgeAllBrokers("default", 3, false);
|
bridgeAllBrokers("default", 3, false);
|
||||||
|
@ -477,16 +546,25 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
BrokerService broker = i.next().broker;
|
BrokerService broker = i.next().broker;
|
||||||
if (!brokerName.equals(broker.getBrokerName())) {
|
if (!brokerName.equals(broker.getBrokerName())) {
|
||||||
verifyConsumerCount(broker, 2, dest);
|
verifyConsumerCount(broker, 2, dest);
|
||||||
|
verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception {
|
private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception {
|
||||||
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
|
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
|
||||||
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
|
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
|
||||||
assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());
|
assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination dest) throws Exception {
|
||||||
|
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
|
||||||
|
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
|
||||||
|
for (Subscription consumer : internalQueue.getConsumers()) {
|
||||||
|
assertEquals("consumer on " + broker.getBrokerName() + " matches priority: " + internalQueue, expectedPriority, consumer.getConsumerInfo().getPriority());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setAutoFail(true);
|
super.setAutoFail(true);
|
||||||
super.setUp();
|
super.setUp();
|
||||||
|
|
Loading…
Reference in New Issue