https://issues.apache.org/jira/browse/AMQ-5791 - apply patch from Vladimír Čaniga with thanks

This commit is contained in:
gtully 2015-05-21 15:53:40 +01:00
parent 9becfc0bed
commit 5ee9a3426f
1 changed files with 61 additions and 24 deletions

View File

@ -107,7 +107,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
} }
public void testMessageLeaks() throws Exception{ public void testMessageLeaks() throws Exception {
clearSelectorCacheFiles(); clearSelectorCacheFiles();
startAllBrokers(); startAllBrokers();
@ -207,7 +207,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
} }
private ProducerThreadTester createProducerTester(String brokerName, javax.jms.Destination destination) throws Exception{ private ProducerThreadTester createProducerTester(String brokerName, javax.jms.Destination destination) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName); BrokerItem brokerItem = brokers.get(brokerName);
Connection conn = brokerItem.createConnection(); Connection conn = brokerItem.createConnection();
@ -218,7 +218,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
return rc; return rc;
} }
public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception{ public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception {
clearSelectorCacheFiles(); clearSelectorCacheFiles();
startAllBrokers(); startAllBrokers();
@ -251,6 +251,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
assertEquals(1, selectingConsumerMessages.getMessageCount()); assertEquals(1, selectingConsumerMessages.getMessageCount());
// assert broker A stats // assert broker A stats
waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 2, 1, 5000);
assertEquals(1, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(1, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getConsumers().size()); .getConsumers().size());
assertEquals(2, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(2, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -262,7 +263,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
} }
private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception{ private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception {
BrokerItem item = brokers.get(broker); BrokerItem item = brokers.get(broker);
Connection c = item.createConnection(); Connection c = item.createConnection();
c.start(); c.start();
@ -270,7 +271,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
return s.createConsumer(consumerQueue); return s.createConsumer(consumerQueue);
} }
public void testSelectorsAndNonSelectors() throws Exception{ public void testSelectorsAndNonSelectors() throws Exception {
clearSelectorCacheFiles(); clearSelectorCacheFiles();
// borkerA is local and brokerB is remote // borkerA is local and brokerB is remote
bridgeAndConfigureBrokers("BrokerA", "BrokerB"); bridgeAndConfigureBrokers("BrokerA", "BrokerB");
@ -320,6 +321,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
assertEquals(15, nonSelectingConsumerMessages.getMessageCount()); assertEquals(15, nonSelectingConsumerMessages.getMessageCount());
// assert broker A stats // assert broker A stats
waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -328,6 +330,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
.getDestinationStatistics().getMessages().getCount()); .getDestinationStatistics().getMessages().getCount());
// assert broker B stats // assert broker B stats
waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -357,9 +360,10 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
selectingConsumerMessages = getConsumerMessages("BrokerB", selectingConsumer); selectingConsumerMessages = getConsumerMessages("BrokerB", selectingConsumer);
selectingConsumerMessages.waitForMessagesToArrive(1, 1000L); selectingConsumerMessages.waitForMessagesToArrive(1, 1000L);
assertEquals(0, selectingConsumerMessages.getMessageCount()) ; assertEquals(0, selectingConsumerMessages.getMessageCount());
// assert broker A stats // assert broker A stats
waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -368,6 +372,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
.getDestinationStatistics().getMessages().getCount()); .getDestinationStatistics().getMessages().getCount());
// assert broker B stats // assert broker B stats
waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -394,6 +399,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
// assert broker A stats // assert broker A stats
waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -402,6 +408,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
.getDestinationStatistics().getMessages().getCount()); .getDestinationStatistics().getMessages().getCount());
// assert broker B stats // assert broker B stats
waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -425,23 +432,16 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
}, 500); }, 500);
// assert broker A stats // assert broker A stats
waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount() == 30;
}
}, 5000);
assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getDequeues().getCount()); .getDestinationStatistics().getDequeues().getCount());
assertEquals(0, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(0, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getMessages().getCount()); .getDestinationStatistics().getMessages().getCount());
// assert broker B stats // assert broker B stats
waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
.getDestinationStatistics().getEnqueues().getCount()); .getDestinationStatistics().getEnqueues().getCount());
assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@ -456,7 +456,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
throws MalformedObjectNameException { throws MalformedObjectNameException {
ObjectName objectName = BrokerMBeanSupport ObjectName objectName = BrokerMBeanSupport
.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); .createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache");
return (VirtualDestinationSelectorCacheViewMBean)broker.getManagementContext() return (VirtualDestinationSelectorCacheViewMBean) broker.getManagementContext()
.newProxyInstance(objectName, VirtualDestinationSelectorCacheViewMBean.class, true); .newProxyInstance(objectName, VirtualDestinationSelectorCacheViewMBean.class, true);
} }
@ -517,8 +517,6 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
remoteConsumer.close(); remoteConsumer.close();
// now let's shut down broker A and clear its persistent selector cache // now let's shut down broker A and clear its persistent selector cache
brokerA.stop(); brokerA.stop();
brokerA.waitUntilStopped(); brokerA.waitUntilStopped();
@ -581,13 +579,12 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
} }
private HashMap<String, Object> asMap(String key, Object value) { private HashMap<String, Object> asMap(String key, Object value) {
HashMap<String, Object> rc = new HashMap<String,Object>(1); HashMap<String, Object> rc = new HashMap<String, Object>(1);
rc.put(key, value); rc.put(key, value);
return rc; return rc;
} }
private void bridgeAndConfigureBrokers(String local, String remote) private void bridgeAndConfigureBrokers(String local, String remote)
throws Exception { throws Exception {
NetworkConnector bridge = bridgeBrokers(local, remote, false, 1, false); NetworkConnector bridge = bridgeBrokers(local, remote, false, 1, false);
@ -628,9 +625,8 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
VirtualTopic virtualTopic = new VirtualTopic(); VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setSelectorAware(true); virtualTopic.setSelectorAware(true);
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
interceptor interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
.setVirtualDestinations(new VirtualDestination[] { virtualTopic }); broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
broker.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
configurePersistenceAdapter(broker); configurePersistenceAdapter(broker);
SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin(); SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
@ -650,6 +646,47 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
broker.setPersistenceAdapter(kaha); broker.setPersistenceAdapter(kaha);
} }
/**
* Typically used before asserts to give producers and consumers some time to finish their tasks
* before the final state is tested.
*
* @param broker BrokerService on which the destinations are looked up
* @param destinationName
* @param topic true if the destination is a Topic, false if it is a Queue
* @param numEnqueueMsgs expected number of enqueued messages in the destination
* @param numDequeueMsgs expected number of dequeued messages in the destination
* @param waitTime number of milliseconds to wait for completion
* @throws Exception
*/
private void waitForMessagesToBeConsumed(final BrokerService broker, final String destinationName,
final boolean topic, final int numEnqueueMsgs, final int numDequeueMsgs, int waitTime) throws Exception {
final ActiveMQDestination destination;
if (topic) {
destination = new ActiveMQTopic(destinationName);
} else {
destination = new ActiveMQQueue(destinationName);
}
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getDestination(destination)
.getDestinationStatistics().getEnqueues().getCount() == numEnqueueMsgs;
}
}, waitTime);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getDestination(destination)
.getDestinationStatistics().getDequeues().getCount() == numDequeueMsgs;
}
}, waitTime);
}
class ProducerThreadTester extends ProducerThread { class ProducerThreadTester extends ProducerThread {
private Set<String> selectors = new LinkedHashSet<String>(); private Set<String> selectors = new LinkedHashSet<String>();