diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index f145fa2719..7daafc969a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -113,7 +113,7 @@ public abstract class AbstractRegion implements Region { } public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - LOG.debug("Adding destination: " + destination); + LOG.debug(broker.getBrokerName() + " adding destination: " + destination); synchronized (destinationsMutex) { Destination dest = destinations.get(destination); if (dest == null) { @@ -216,7 +216,7 @@ public abstract class AbstractRegion implements Region { } public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - LOG.debug("Adding consumer: " + info.getConsumerId()); + LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); ActiveMQDestination destination = info.getDestination(); if (destination != null && !destination.isPattern() && !destination.isComposite()) { // lets auto-create the destination @@ -308,7 +308,7 @@ public abstract class AbstractRegion implements Region { } public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - LOG.debug("Removing consumer: " + info.getConsumerId()); + LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination()); Subscription sub = subscriptions.remove(info.getConsumerId()); //The sub could be removed elsewhere - see ConnectionSplitBroker diff --git a/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java b/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java index e67d2b2740..eeea643a6c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java @@ -17,6 +17,7 @@ package org.apache.activemq.command; import java.io.IOException; +import java.util.Arrays; import javax.jms.JMSException; @@ -75,7 +76,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { if (contains(message.getBrokerPath(), networkBrokerId)) { if (LOG.isTraceEnabled()) { - LOG.trace("Message all ready routed once through this broker - ignoring: " + message); + LOG.trace("Message all ready routed once through this broker (" + + networkBrokerId + "), path: " + + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message); } return false; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java index 823974458f..a9404b6750 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -64,7 +64,7 @@ public class ConduitBridge extends DemandForwardingBridge { DemandSubscription ds = (DemandSubscription)i.next(); if (filter.matches(ds.getLocalInfo().getDestination())) { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " matched exsting sub (add interest) for : " + ds.getRemoteInfo() + LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo() + " with sub: " + info); } // add the interest in the subscription @@ -84,18 +84,17 @@ public class ConduitBridge extends DemandForwardingBridge { for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { DemandSubscription ds = (DemandSubscription)i.next(); - ds.remove(id); + if (ds.remove(id)) { + if (LOG.isDebugEnabled()) { + LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo()); + } + } if (ds.isEmpty()) { tmpList.add(ds); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo()); - } } } for (Iterator i = tmpList.iterator(); i.hasNext();) { DemandSubscription ds = i.next(); - subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId()); removeSubscription(ds); if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo()); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index d1e7223451..38bdd8db46 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -617,6 +617,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void removeSubscription(DemandSubscription sub) throws IOException { if (sub != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); + } subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index e1fc81f910..1cfb3ddddc 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -113,12 +113,16 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { } - // This will interconnect all brokes using multicast + // This will interconnect all brokers using multicast protected void bridgeAllBrokers() throws Exception { - bridgeAllBrokers("default", 1, false); + bridgeAllBrokers("default", 1, false, false); + } + + protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception { + bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false); } - protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception { + protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception { Collection brokerList = brokers.values(); for (Iterator i = brokerList.iterator(); i.hasNext();) { BrokerService broker = i.next().broker; @@ -134,6 +138,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName); nc.setNetworkTTL(ttl); nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs); + nc.setDecreaseNetworkConsumerPriority(decreasePriority); } // Multicasting may take longer to setup diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java index 07aa078d5f..fb74f87936 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java @@ -20,6 +20,8 @@ import java.net.URI; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Destination; import javax.jms.MessageConsumer; @@ -30,11 +32,14 @@ import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.MessageIdList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @version $Revision: 1.1.1.1 $ */ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { + private static final Log LOG = LogFactory.getLog(ThreeBrokerQueueNetworkTest.class); protected static final int MESSAGE_COUNT = 100; /** @@ -243,9 +248,6 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); } - /** - * BrokerA <-> BrokerB <-> BrokerC - */ public void testAllConnectedUsingMulticast() throws Exception { // Setup broker networks bridgeAllBrokers(); @@ -276,6 +278,156 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); } + + public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception { + bridgeAllBrokers("default", 3, false); + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + int messageCount = 2000; + CountDownLatch messagesReceived = new CountDownLatch(messageCount); + MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived); + + // Let's try to wait for advisory percolation. + Thread.sleep(1000); + + // Send messages + sendMessages("BrokerA", dest, messageCount); + + assertTrue(messagesReceived.await(30, TimeUnit.SECONDS)); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + assertEquals(messageCount, msgsA.getMessageCount()); + } + + public void testAllConnectedWithSpare() throws Exception { + bridgeAllBrokers("default", 3, false); + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + int messageCount = 2000; + CountDownLatch messagesReceived = new CountDownLatch(messageCount); + MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived); + + // ensure advisory percolation. + Thread.sleep(1000); + + // Send messages + sendMessages("BrokerB", dest, messageCount); + + assertTrue(messagesReceived.await(30, TimeUnit.SECONDS)); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + assertEquals(messageCount, msgsA.getMessageCount()); + } + + public void testMigrateConsumerStuckMessages() throws Exception { + boolean suppressQueueDuplicateSubscriptions = false; + bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions); + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + LOG.info("Consumer on A"); + MessageConsumer clientA = createConsumer("BrokerA", dest); + + // ensure advisors have percolated + Thread.sleep(500); + + LOG.info("Consumer on B"); + int messageCount = 2000; + + // will only get half of the messages + CountDownLatch messagesReceived = new CountDownLatch(messageCount/2); + MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived); + + // ensure advisors have percolated + Thread.sleep(500); + + LOG.info("Close consumer on A"); + clientA.close(); + + // ensure advisors have percolated + Thread.sleep(500); + + LOG.info("Send to B"); + sendMessages("BrokerB", dest, messageCount); + + // Let's try to wait for any messages. + assertTrue(messagesReceived.await(30, TimeUnit.SECONDS)); + + // Get message count + MessageIdList msgs = getConsumerMessages("BrokerB", clientB); + + // see will any more arrive + Thread.sleep(500); + assertEquals(messageCount/2, msgs.getMessageCount()); + + // pick up the stuck messages + messagesReceived = new CountDownLatch(messageCount/2); + clientA = createConsumer("BrokerA", dest, messagesReceived); + // Let's try to wait for any messages. + assertTrue(messagesReceived.await(30, TimeUnit.SECONDS)); + + msgs = getConsumerMessages("BrokerA", clientA); + assertEquals(messageCount/2, msgs.getMessageCount()); + } + + // use case: for maintenance, migrate consumers and producers from one + // node in the network to another so node can be replaced/updated + public void testMigrateConsumer() throws Exception { + boolean suppressQueueDuplicateSubscriptions = true; + boolean decreaseNetworkConsumerPriority = true; + bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority); + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + LOG.info("Consumer on A"); + MessageConsumer clientA = createConsumer("BrokerA", dest); + + // ensure advisors have percolated + Thread.sleep(500); + + LOG.info("Consumer on B"); + int messageCount = 2000; + CountDownLatch messagesReceived = new CountDownLatch(messageCount); + MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived); + + // make the consumer slow so that any network consumer has a chance, even + // if it has a lower priority + MessageIdList msgs = getConsumerMessages("BrokerB", clientB); + msgs.setProcessingDelay(10); + + // ensure advisors have percolated + Thread.sleep(500); + + LOG.info("Close consumer on A"); + clientA.close(); + + // ensure advisors have percolated + Thread.sleep(500); + + LOG.info("Send to B"); + sendMessages("BrokerB", dest, messageCount); + + // Let's try to wait for any messages. + assertTrue(messagesReceived.await(30, TimeUnit.SECONDS)); + assertEquals(messageCount, msgs.getMessageCount()); + } + public void testNoDuplicateQueueSubs() throws Exception { bridgeAllBrokers("default", 3, true); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java index 5eb88776d0..8cc27c9146 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java @@ -98,6 +98,40 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); } + + /** + * BrokerA -> BrokerB && BrokerB -> BrokerA + */ + public void testDuplexStaticRemoteBrokerHasNoConsumer() throws Exception { + // Setup broker networks + boolean dynamicOnly = true; + int networkTTL = 2; + boolean conduit = true; + bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduit); + bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer clientA = createConsumer("BrokerA", dest); + + Thread.sleep(2*1000); + + int messageCount = 2000; + // Send messages + sendMessages("BrokerA", dest, messageCount); + + // Get message count + MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + + msgsA.waitForMessagesToArrive(messageCount); + + assertEquals(messageCount, msgsA.getMessageCount()); + + } public void setUp() throws Exception { super.setAutoFail(true);