diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java index 40366fff26..91349cb801 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java @@ -16,13 +16,6 @@ */ package org.apache.activemq.usecases; -import java.io.File; -import java.io.IOException; -import java.net.URI; - -import javax.jms.Destination; -import javax.jms.MessageConsumer; - import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.DestinationInterceptor; @@ -34,6 +27,12 @@ import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.util.MessageIdList; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import java.io.File; +import java.io.IOException; +import java.net.URI; + public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultipleBrokersTestSupport { protected static final int MESSAGE_COUNT = 10; boolean dynamicOnly = true; @@ -128,6 +127,55 @@ public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultiple assertEquals(MESSAGE_COUNT, msgsB2.getMessageCount()); } + + /** + * BrokerA -> BrokerB && BrokerB -> BrokerA + */ + public void testVirtualDestinationsDinamicallyIncludedBehavior3() throws Exception { + final String topic = "global.test"; + final String vq = "Consumer.foo." + topic; + + startAllBrokers(); + final int msgs1 = 1001; + final int msgs2 = 1456; + + // Setup destination + Destination tDest = createDestination(topic, true); + Destination vqDest = createDestination(vq, false); + + // Setup consumers + MessageConsumer clientB1t = createConsumer("BrokerA", tDest); + MessageConsumer clientB2t = createConsumer("BrokerB", tDest); + MessageConsumer clientB1vq = createConsumer("BrokerA", vqDest); + + Thread.sleep(2*1000); + + // Send messages + sendMessages("BrokerA", tDest, msgs1); + sendMessages("BrokerB", tDest, msgs2); + + Thread.sleep(5000); + + // Get message count + MessageIdList msgsB1t = getConsumerMessages("BrokerA", clientB1t); + msgsB1t.waitForMessagesToArrive(msgs1 + msgs2); + assertEquals(msgs1 + msgs2, msgsB1t.getMessageCount()); + MessageIdList msgsB2t = getConsumerMessages("BrokerB", clientB2t); + msgsB2t.waitForMessagesToArrive(msgs1 + msgs2); + assertEquals(msgs1 + msgs2, msgsB2t.getMessageCount()); + MessageIdList msgsB1vq = getConsumerMessages("BrokerA", clientB1vq); + msgsB1vq.waitForMessagesToArrive(msgs1 + msgs2); + assertEquals(msgs1 + msgs2, msgsB1vq.getMessageCount()); + + assertEquals(0, getQueueSize("BrokerA", (ActiveMQDestination)vqDest)); + assertEquals(0, getQueueSize("BrokerB", (ActiveMQDestination)vqDest)); + destroyAllBrokers(); + } + + public long getQueueSize(String broker, ActiveMQDestination destination) throws Exception { + BrokerItem bi = brokers.get(broker); + return bi.broker.getDestination(destination).getDestinationStatistics().getMessages().getCount(); + } public void setUp() throws Exception { super.setAutoFail(true); @@ -140,15 +188,20 @@ public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultiple NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduit); nc1.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority); nc1.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions); + nc1.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); + nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE)); nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.>", ActiveMQDestination.QUEUE_TYPE)); + //nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit); nc2.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority); nc2.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions); + nc2.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); + nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE)); nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); + //nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); } private BrokerService createAndConfigureBroker(URI uri) throws Exception { @@ -168,6 +221,7 @@ public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultiple File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName()); KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(dataFileDir); + kaha.deleteAllMessages(); broker.setPersistenceAdapter(kaha); } } \ No newline at end of file