From 5ee9a3426f2837d88ed5b315c4543da25fd1c9db Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 21 May 2015 15:53:40 +0100 Subject: [PATCH] =?UTF-8?q?https://issues.apache.org/jira/browse/AMQ-5791?= =?UTF-8?q?=20-=20apply=20patch=20from=20Vladim=C3=ADr=20=C4=8Caniga=20wit?= =?UTF-8?q?h=20thanks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...rtualTopicSelectorAwareForwardingTest.java | 85 +++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java index 63fdd5a6d3..d1be900e76 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java @@ -107,7 +107,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } - public void testMessageLeaks() throws Exception{ + public void testMessageLeaks() throws Exception { clearSelectorCacheFiles(); startAllBrokers(); @@ -207,7 +207,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } - private ProducerThreadTester createProducerTester(String brokerName, javax.jms.Destination destination) throws Exception{ + private ProducerThreadTester createProducerTester(String brokerName, javax.jms.Destination destination) throws Exception { BrokerItem brokerItem = brokers.get(brokerName); Connection conn = brokerItem.createConnection(); @@ -218,7 +218,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends return rc; } - public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception{ + public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception { clearSelectorCacheFiles(); startAllBrokers(); @@ -251,6 +251,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends assertEquals(1, selectingConsumerMessages.getMessageCount()); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 2, 1, 5000); assertEquals(1, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getConsumers().size()); assertEquals(2, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -262,7 +263,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } - private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception{ + private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception { BrokerItem item = brokers.get(broker); Connection c = item.createConnection(); c.start(); @@ -270,7 +271,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends return s.createConsumer(consumerQueue); } - public void testSelectorsAndNonSelectors() throws Exception{ + public void testSelectorsAndNonSelectors() throws Exception { clearSelectorCacheFiles(); // borkerA is local and brokerB is remote bridgeAndConfigureBrokers("BrokerA", "BrokerB"); @@ -320,6 +321,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends assertEquals(15, nonSelectingConsumerMessages.getMessageCount()); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -328,6 +330,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -357,9 +360,10 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends selectingConsumerMessages = getConsumerMessages("BrokerB", selectingConsumer); selectingConsumerMessages.waitForMessagesToArrive(1, 1000L); - assertEquals(0, selectingConsumerMessages.getMessageCount()) ; + assertEquals(0, selectingConsumerMessages.getMessageCount()); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -368,6 +372,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -394,6 +399,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -402,6 +408,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -425,23 +432,16 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends }, 500); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000); assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) - .getDestinationStatistics().getEnqueues().getCount() == 30; - } - }, 5000); - assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getDequeues().getCount()); assertEquals(0, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000); assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -456,7 +456,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends throws MalformedObjectNameException { ObjectName objectName = BrokerMBeanSupport .createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); - return (VirtualDestinationSelectorCacheViewMBean)broker.getManagementContext() + return (VirtualDestinationSelectorCacheViewMBean) broker.getManagementContext() .newProxyInstance(objectName, VirtualDestinationSelectorCacheViewMBean.class, true); } @@ -517,8 +517,6 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends remoteConsumer.close(); - - // now let's shut down broker A and clear its persistent selector cache brokerA.stop(); brokerA.waitUntilStopped(); @@ -581,13 +579,12 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } private HashMap asMap(String key, Object value) { - HashMap rc = new HashMap(1); + HashMap rc = new HashMap(1); rc.put(key, value); return rc; } - private void bridgeAndConfigureBrokers(String local, String remote) throws Exception { NetworkConnector bridge = bridgeBrokers(local, remote, false, 1, false); @@ -628,9 +625,8 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends VirtualTopic virtualTopic = new VirtualTopic(); virtualTopic.setSelectorAware(true); VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); - interceptor - .setVirtualDestinations(new VirtualDestination[] { virtualTopic }); - broker.setDestinationInterceptors(new DestinationInterceptor[] { interceptor }); + interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic}); + broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); configurePersistenceAdapter(broker); SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin(); @@ -650,6 +646,47 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends broker.setPersistenceAdapter(kaha); } + /** + * Typically used before asserts to give producers and consumers some time to finish their tasks + * before the final state is tested. + * + * @param broker BrokerService on which the destinations are looked up + * @param destinationName + * @param topic true if the destination is a Topic, false if it is a Queue + * @param numEnqueueMsgs expected number of enqueued messages in the destination + * @param numDequeueMsgs expected number of dequeued messages in the destination + * @param waitTime number of milliseconds to wait for completion + * @throws Exception + */ + private void waitForMessagesToBeConsumed(final BrokerService broker, final String destinationName, + final boolean topic, final int numEnqueueMsgs, final int numDequeueMsgs, int waitTime) throws Exception { + final ActiveMQDestination destination; + if (topic) { + destination = new ActiveMQTopic(destinationName); + } else { + destination = new ActiveMQQueue(destinationName); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + return broker.getDestination(destination) + .getDestinationStatistics().getEnqueues().getCount() == numEnqueueMsgs; + } + }, waitTime); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + return broker.getDestination(destination) + .getDestinationStatistics().getDequeues().getCount() == numDequeueMsgs; + } + }, waitTime); + } + + class ProducerThreadTester extends ProducerThread { private Set selectors = new LinkedHashSet(); @@ -699,4 +736,4 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } } -} \ No newline at end of file +}