implement AMQ-2198|https://issues.apache.org/activemq/browse/AMQ-2198 add setSuppressDuplicateQueueSubscriptions option to Network configuration, when true, duplicate subs for the same consumer will be suppressed, relevant when networkTTL > 1

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@762925 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-04-07 19:48:29 +00:00
parent 9f548bb74a
commit 5511217fb1
4 changed files with 110 additions and 24 deletions

View File

@ -940,27 +940,32 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
/*
* check our existing subs networkConsumerIds against the list of network ids in this subscription
* a match means a duplicate which we suppress for topics
* A match means a duplicate which we suppress for topics and maybe for queues
*/
private boolean isDuplicateNetworkSubscription(ConsumerInfo consumerInfo) {
boolean isDuplicate = false;
if (consumerInfo.getDestination().isTopic()) {
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
if (candidateConsumers.isEmpty()) {
candidateConsumers.add(consumerInfo.getConsumerId());
}
Collection<Subscription> currentSubs = getTopicRegionSubscriptions();
for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
if (matchFound(candidateConsumers, networkConsumers)) {
if (LOG.isDebugEnabled()) {
LOG.debug("subscription: " + consumerInfo + " is duplicated by network subscription: "
+ sub.getConsumerInfo() + ", networkComsumerIds: " + networkConsumers);
}
isDuplicate = true;
break;
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) {
return isDuplicate;
}
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
if (candidateConsumers.isEmpty()) {
candidateConsumers.add(consumerInfo.getConsumerId());
}
Collection<Subscription> currentSubs =
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
if (matchFound(candidateConsumers, networkConsumers)) {
if (LOG.isDebugEnabled()) {
LOG.debug("subscription: " + consumerInfo + " is duplicated by network subscription: "
+ sub.getConsumerInfo() + ", networkComsumerIds: " + networkConsumers);
}
isDuplicate = true;
break;
}
}
}
@ -978,12 +983,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return found;
}
private final Collection<Subscription> getTopicRegionSubscriptions() {
private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
AbstractRegion topicRegion = (AbstractRegion) region.getTopicRegion();
return topicRegion.getSubscriptions().values();
AbstractRegion abstractRegion = (AbstractRegion)
(isTopic ? region.getTopicRegion() : region.getQueueRegion());
return abstractRegion.getSubscriptions().values();
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());

View File

@ -36,6 +36,7 @@ public class NetworkBridgeConfiguration {
private String password;
private String destinationFilter = ">";
private String name = null;
private boolean suppressDuplicateQueueSubscriptions = false;
/**
* @return the conduitSubscriptions
@ -222,4 +223,16 @@ public class NetworkBridgeConfiguration {
public void setName(String name) {
this.name = name;
}
public boolean isSuppressDuplicateQueueSubscriptions() {
return suppressDuplicateQueueSubscriptions;
}
/**
*
* @param val if true, duplicate network queue subscriptions (in a cyclic network) will be suppressed
*/
public void setSuppressDuplicateQueueSubscriptions(boolean val) {
suppressDuplicateQueueSubscriptions = val;
}
}

View File

@ -115,10 +115,10 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
// This will interconnect all brokes using multicast
protected void bridgeAllBrokers() throws Exception {
bridgeAllBrokers("default");
bridgeAllBrokers("default", 1, false);
}
protected void bridgeAllBrokers(String groupName) throws Exception {
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
@ -131,7 +131,9 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
TransportConnector transport = (TransportConnector)transportConnectors.get(0);
transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
broker.addNetworkConnector("multicast://default?group=" + groupName);
NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
nc.setNetworkTTL(ttl);
nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
}
// Multicasting may take longer to setup

View File

@ -17,12 +17,18 @@
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
/**
@ -270,6 +276,65 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
public void testNoDuplicateQueueSubs() throws Exception {
bridgeAllBrokers("default", 3, true);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
String brokerName = "BrokerA";
createConsumer(brokerName, dest);
// wait for advisories
Thread.sleep(2000);
// verify there is one consumer on each broker, no cycles
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
verifyConsumerCount(broker, 1, dest);
}
}
public void testDuplicateQueueSubs() throws Exception {
bridgeAllBrokers("default", 3, false);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
String brokerName = "BrokerA";
createConsumer(brokerName, dest);
// wait for advisories
Thread.sleep(2000);
verifyConsumerCount(brokers.get(brokerName).broker, 1, dest);
// in a cyclic network, other brokers will get second order consumer
// an alternative route to A via each other
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
if (!brokerName.equals(broker.getBrokerName())) {
verifyConsumerCount(broker, 2, dest);
}
}
}
private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();