From 9670af1e0d766aaaadf07eba6fd4756a82c84118 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 23 Jun 2009 22:29:47 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2298 - respect priority when suppressing duplicate network subscriptions git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@787855 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 110 +++++++++++++----- .../activemq/network/NetworkConnector.java | 20 +++- .../usecases/ThreeBrokerQueueNetworkTest.java | 84 ++++++++++++- 3 files changed, 179 insertions(+), 35 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 0da0615c1c..43feb2bedc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -559,10 +559,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // in a cyclic network there can be multiple bridges per broker that can propagate // a network subscription so there is a need to synchronise on a shared entity synchronized(brokerService.getVmConnectorURI()) { - if (isDuplicateNetworkSubscription(info)) { - // trace in method - return; - } if (addConsumerInfo(info)) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info); @@ -637,8 +633,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); } - subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); + subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); } } @@ -951,50 +947,88 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { - boolean result = false; + boolean consumerAdded = false; ConsumerInfo info = consumerInfo.copy(); addRemoteBrokerToBrokerPath(info); DemandSubscription sub = createDemandSubscription(info); if (sub != null) { - addSubscription(sub); - result = true; + if (duplicateSuppressionIsRequired(sub) ) { + undoMapRegistration(sub); + } else { + addSubscription(sub); + consumerAdded = true; + } } - return result; + return consumerAdded; } + private void undoMapRegistration(DemandSubscription sub) { + subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); + subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); + } + /* * check our existing subs networkConsumerIds against the list of network ids in this subscription * A match means a duplicate which we suppress for topics and maybe for queues */ - private boolean isDuplicateNetworkSubscription(ConsumerInfo consumerInfo) { - boolean isDuplicate = false; + private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { + final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); + boolean suppress = false; if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) { - return isDuplicate; - } - - List candidateConsumers = consumerInfo.getNetworkConsumerIds(); - if (candidateConsumers.isEmpty()) { - candidateConsumers.add(consumerInfo.getConsumerId()); + return suppress; } + List candidateConsumers = consumerInfo.getNetworkConsumerIds(); Collection currentSubs = getRegionSubscriptions(consumerInfo.getDestination().isTopic()); for (Subscription sub : currentSubs) { List networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); if (!networkConsumers.isEmpty()) { if (matchFound(candidateConsumers, networkConsumers)) { - if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName - + ", sub: " + consumerInfo + " is duplicated by network subscription: " - + sub.getConsumerInfo() + ", networkComsumerIds: " + networkConsumers); - } - isDuplicate = true; + suppress = hasLowerPriority(sub, candidate.getLocalInfo()); break; } } } - return isDuplicate; + return suppress; + } + + + private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { + boolean suppress = false; + + if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { + if (LOG.isDebugEnabled()) { + LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName + + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " + + existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); + } + suppress = true; + } else { + // remove the existing lower priority duplicate and allow this candidate + try { + removeDuplicateSubscription(existingSub); + + if (LOG.isDebugEnabled()) { + LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + + " with sub from " + remoteBrokerName + + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " + + candidateInfo.getNetworkConsumerIds()); + } + } catch (IOException e) { + LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: "+ existingSub, e); + } + } + return suppress; + } + + private void removeDuplicateSubscription(Subscription existingSub) throws IOException { + for (NetworkConnector connector: brokerService.getNetworkConnectors()) { + if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { + break; + } + } } private boolean matchFound(List candidateConsumers, List networkConsumers) { @@ -1034,15 +1068,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (configuration.isDecreaseNetworkConsumerPriority()) { byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; - if (priority > Byte.MIN_VALUE && info.getBrokerPath() != null && info.getBrokerPath().length > 1) { - // The longer the path to the consumer, the less it's consumer - // priority. - priority -= info.getBrokerPath().length + 1; - if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info); - } + if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { + // The longer the path to the consumer, the less it's consumer priority. + priority -= info.getBrokerPath().length + 1; } result.getLocalInfo().setPriority(priority); + if (LOG.isDebugEnabled()) { + LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info); + } } configureDemandSubscription(info, result); return result; @@ -1079,6 +1112,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); } + protected void removeDemandSubscription(ConsumerId id) throws IOException { DemandSubscription sub = subscriptionMapByRemoteId.remove(id); if (LOG.isDebugEnabled()) { @@ -1091,6 +1125,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } } + + protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { + boolean removeDone = false; + DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); + if (sub != null) { + try { + removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); + removeDone = true; + } catch (IOException e) { + LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e); + } + } + return removeDone; + } protected void waitStarted() throws InterruptedException { startedLatch.await(); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index 96ad208e51..13d6ef52a0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -16,13 +16,16 @@ */ package org.apache.activemq.network; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.management.MBeanServer; @@ -34,6 +37,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.NetworkBridgeView; import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.util.JMXSupport; @@ -67,7 +71,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem private List staticallyIncludedDestinations = new CopyOnWriteArrayList(); private BrokerService brokerService; private ObjectName objectName; - + private ConcurrentLinkedQueue configuredBridges = new ConcurrentLinkedQueue(); + public NetworkConnector() { } @@ -186,6 +191,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem dest = (ActiveMQDestination[])topics.toArray(dest); result.setDurableDestinations(dest); } + configuredBridges.add(result); return result; } @@ -271,4 +277,16 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress()))); } + // ask all the bridges as we can't know to which this consumer is tied + public boolean removeDemandSubscription(ConsumerId consumerId) { + boolean removeSucceeded = false; + for (DemandForwardingBridgeSupport bridge: configuredBridges) { + if (bridge.removeDemandSubscriptionByLocalId(consumerId)) { + removeSucceeded = true; + break; + } + } + return removeSucceeded; + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java index 3eb3a20f64..a0adb064e5 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java @@ -22,15 +22,24 @@ import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Destination; import javax.jms.MessageConsumer; import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.util.MessageIdList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -322,7 +331,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { // Send messages sendMessages("BrokerB", dest, messageCount); - assertTrue(messagesReceived.await(30, TimeUnit.SECONDS)); + assertTrue("messaged received within time limit", messagesReceived.await(30, TimeUnit.SECONDS)); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); @@ -424,7 +433,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { sendMessages("BrokerB", dest, messageCount); // Let's try to wait for any messages. - assertTrue(messagesReceived.await(60, TimeUnit.SECONDS)); + assertTrue("messages are received within limit", messagesReceived.await(60, TimeUnit.SECONDS)); assertEquals(messageCount, msgs.getMessageCount()); } @@ -451,8 +460,68 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { verifyConsumerCount(broker, 1, dest); } } + + public void testNoDuplicateQueueSubsHasLowestPriority() throws Exception { + boolean suppressQueueDuplicateSubscriptions = true; + boolean decreaseNetworkConsumerPriority = true; + bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority); + + // Setup destination + final Destination dest = createDestination("TEST.FOO", false); + + // delay the advisory messages so that one can percolate fully (cyclicly) before the other + BrokerItem brokerB = brokers.get("BrokerA"); + brokerB.broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() { + + public Broker installPlugin(Broker broker) throws Exception { + return new BrokerFilter(broker) { + + final AtomicInteger count = new AtomicInteger(); + @Override + public void preProcessDispatch( + MessageDispatch messageDispatch) { + if (messageDispatch.getDestination().getPhysicalName().contains("ActiveMQ.Advisory.Consumer")) { + // lets delay the first advisory + if (count.getAndIncrement() == 0) { + LOG.info("Sleeping on first advisory: " + messageDispatch); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + super.postProcessDispatch(messageDispatch); + } + + }; + }} + }); + + startAllBrokers(); + + + // Setup consumers + String brokerName = "BrokerA"; + createConsumer(brokerName, dest); + + // wait for advisories + Thread.sleep(5000); + + // verify there is one consumer on each broker, no cycles + Collection brokerList = brokers.values(); + for (Iterator i = brokerList.iterator(); i.hasNext();) { + BrokerService broker = i.next().broker; + verifyConsumerCount(broker, 1, dest); + if (!brokerName.equals(broker.getBrokerName())) { + verifyConsumePriority(broker, ConsumerInfo.NETWORK_CONSUMER_PRIORITY, dest); + } + } + } + + public void testDuplicateQueueSubs() throws Exception { bridgeAllBrokers("default", 3, false); @@ -477,16 +546,25 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { BrokerService broker = i.next().broker; if (!brokerName.equals(broker.getBrokerName())) { verifyConsumerCount(broker, 2, dest); + verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest); } } } private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception { RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); - Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next(); + Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next(); assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size()); } + private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination dest) throws Exception { + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next(); + for (Subscription consumer : internalQueue.getConsumers()) { + assertEquals("consumer on " + broker.getBrokerName() + " matches priority: " + internalQueue, expectedPriority, consumer.getConsumerInfo().getPriority()); + } + } + public void setUp() throws Exception { super.setAutoFail(true); super.setUp();