diff --git a/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java b/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java index 63766b28ee..6da4546722 100644 --- a/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java +++ b/activemq-spring/src/test/java/org/apache/bugs/AMQ1730Test.java @@ -17,12 +17,7 @@ package org.apache.bugs; -import junit.framework.TestCase; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.jms.listener.DefaultMessageListenerContainer; +import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -33,17 +28,20 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import java.util.concurrent.CountDownLatch; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.listener.DefaultMessageListenerContainer; public class AMQ1730Test extends TestCase { private static final Logger log = LoggerFactory.getLogger(AMQ1730Test.class); - - private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount"; - - BrokerService brokerService; private static final int MESSAGE_COUNT = 250; @@ -55,6 +53,7 @@ public class AMQ1730Test extends TestCase { @Override protected void setUp() throws Exception { super.setUp(); + brokerService = new BrokerService(); brokerService.addConnector("tcp://localhost:0"); brokerService.setUseJmx(false); @@ -65,6 +64,7 @@ public class AMQ1730Test extends TestCase { protected void tearDown() throws Exception { super.tearDown(); brokerService.stop(); + brokerService.waitUntilStopped(); } public void testRedelivery() throws Exception { @@ -109,7 +109,7 @@ public class AMQ1730Test extends TestCase { messageListenerContainer.setSessionTransacted(false); messageListenerContainer.setMessageListener(new MessageListener() { - + @Override public void onMessage(Message message) { if (!(message instanceof TextMessage)) { throw new RuntimeException(); @@ -159,7 +159,5 @@ public class AMQ1730Test extends TestCase { T get() { return value; } - } - } diff --git a/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java b/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java index a1f87a9d2a..22a2ba0935 100644 --- a/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java +++ b/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java @@ -35,11 +35,11 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.pool.PooledConnectionFactory; -//import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.listener.DefaultMessageListenerContainer; +//import org.apache.activemq.pool.PooledConnectionFactory; public class AMQ2754Test extends TestCase { @@ -47,110 +47,117 @@ public class AMQ2754Test extends TestCase { BrokerService brokerService1 = null; BrokerService brokerService2 = null; + String broker1Uri; + String broker2Uri; + final int total = 100; final CountDownLatch latch = new CountDownLatch(total); final boolean conduitSubscriptions = true; try { - { - brokerService1 = new BrokerService(); - brokerService1.setBrokerName("consumer"); - brokerService1.setUseJmx(false); - brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter()); - brokerService1.addConnector("tcp://0.0.0.0:61616"); - brokerService1.start(); - } - - { - brokerService2 = new BrokerService(); - brokerService2.setBrokerName("producer"); - brokerService2.setUseJmx(false); - brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter()); - brokerService2.addConnector("tcp://0.0.0.0:51515"); - NetworkConnector network2 = brokerService2.addNetworkConnector("static:(tcp://localhost:61616)"); - network2.setName("network1"); - network2.setDynamicOnly(true); - network2.setConduitSubscriptions(conduitSubscriptions); - network2.setNetworkTTL(3); - network2.setPrefetchSize(1); - brokerService2.start(); - } - - ExecutorService pool = Executors.newSingleThreadExecutor(); - - ActiveMQConnectionFactory connectionFactory1 = - new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)"); - - connectionFactory1.setWatchTopicAdvisories(false); - final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); - container.setConnectionFactory(connectionFactory1); - container.setMaxConcurrentConsumers(10); - container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); - container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); - container.setDestination(new ActiveMQQueue("testingqueue")); - container.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - latch.countDown(); + { + brokerService1 = new BrokerService(); + brokerService1.setBrokerName("consumer"); + brokerService1.setUseJmx(false); + brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter()); + broker1Uri = brokerService1.addConnector("tcp://0.0.0.0:0").getPublishableConnectString(); + brokerService1.start(); } - }); - container.setMaxMessagesPerTask(1); - container.afterPropertiesSet(); - container.start(); - pool.submit(new Callable() { - public Object call() throws Exception { - try { - final int batch = 10; - ActiveMQConnectionFactory connectionFactory2 = - new ActiveMQConnectionFactory("failover:(tcp://localhost:51515)"); - PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2); - connectionFactory2.setWatchTopicAdvisories(false); - JmsTemplate template = new JmsTemplate(pooledConnectionFactory); - ActiveMQQueue queue = new ActiveMQQueue("testingqueue"); - for(int b = 0; b < batch; b++) { - for(int i = 0; i < (total / batch); i++) { - final String id = ":batch=" + b + "i=" + i; - template.send(queue, new MessageCreator() { - public Message createMessage(Session session) throws JMSException { - TextMessage message = session.createTextMessage(); - message.setText("Hello World!" + id); - return message; - } - }); - } - // give spring time to scale back again - while(container.getActiveConsumerCount() > 1) { - System.out.println("active consumer count:" + container.getActiveConsumerCount()); - System.out.println("concurrent consumer count: " + container.getConcurrentConsumers()); - Thread.sleep(1000); - } - } - //pooledConnectionFactory.stop(); - } catch(Throwable t) { - t.printStackTrace(); + { + brokerService2 = new BrokerService(); + brokerService2.setBrokerName("producer"); + brokerService2.setUseJmx(false); + brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter()); + broker2Uri = brokerService2.addConnector("tcp://0.0.0.0:0").getPublishableConnectString(); + NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")"); + network2.setName("network1"); + network2.setDynamicOnly(true); + network2.setConduitSubscriptions(conduitSubscriptions); + network2.setNetworkTTL(3); + network2.setPrefetchSize(1); + brokerService2.start(); + } + + ExecutorService pool = Executors.newSingleThreadExecutor(); + + ActiveMQConnectionFactory connectionFactory1 = + new ActiveMQConnectionFactory("failover:("+broker1Uri+")"); + + connectionFactory1.setWatchTopicAdvisories(false); + final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); + container.setConnectionFactory(connectionFactory1); + container.setMaxConcurrentConsumers(10); + container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); + container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); + container.setDestination(new ActiveMQQueue("testingqueue")); + container.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); } - return null; + }); + container.setMaxMessagesPerTask(1); + container.afterPropertiesSet(); + container.start(); + + final String finalBroker2Uri = broker2Uri; + + pool.submit(new Callable() { + @Override + public Object call() throws Exception { + try { + final int batch = 10; + ActiveMQConnectionFactory connectionFactory2 = + new ActiveMQConnectionFactory("failover:("+finalBroker2Uri+")"); + PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2); + connectionFactory2.setWatchTopicAdvisories(false); + JmsTemplate template = new JmsTemplate(pooledConnectionFactory); + ActiveMQQueue queue = new ActiveMQQueue("testingqueue"); + for(int b = 0; b < batch; b++) { + for(int i = 0; i < (total / batch); i++) { + final String id = ":batch=" + b + "i=" + i; + template.send(queue, new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + TextMessage message = session.createTextMessage(); + message.setText("Hello World!" + id); + return message; + } + }); + } + // give spring time to scale back again + while(container.getActiveConsumerCount() > 1) { + System.out.println("active consumer count:" + container.getActiveConsumerCount()); + System.out.println("concurrent consumer count: " + container.getConcurrentConsumers()); + Thread.sleep(1000); + } + } + //pooledConnectionFactory.stop(); + } catch(Throwable t) { + t.printStackTrace(); + } + return null; + } + }); + + pool.shutdown(); + pool.awaitTermination(10, TimeUnit.SECONDS); + + int count = 0; + + // give it 20 seconds + while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) { + System.out.println("count " + latch.getCount()); } - }); - pool.shutdown(); - pool.awaitTermination(10, TimeUnit.SECONDS); - - int count = 0; - - // give it 20 seconds - while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) { - System.out.println("count " + latch.getCount()); - } - - - container.destroy(); + container.destroy(); } finally { - try { if(brokerService1 != null) { + try { if(brokerService1 != null) { brokerService1.stop(); }} catch(Throwable t) { t.printStackTrace(); } - try { if(brokerService2 != null) { + try { if(brokerService2 != null) { brokerService2.stop(); }} catch(Throwable t) { t.printStackTrace(); } } @@ -158,7 +165,5 @@ public class AMQ2754Test extends TestCase { if(latch.getCount() > 0) { fail("latch should have gone down to 0 but was " + latch.getCount()); } - } - -} \ No newline at end of file +} \ No newline at end of file diff --git a/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java b/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java index b532cec7d3..99a008794a 100644 --- a/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java +++ b/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java @@ -39,9 +39,9 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.junit.Ignore; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.junit.Test; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.core.JmsTemplate; @@ -61,48 +61,49 @@ public class LoadBalanceTest { final AtomicInteger broker1Count = new AtomicInteger(0); final AtomicInteger broker2Count = new AtomicInteger(0); final CountDownLatch startProducer = new CountDownLatch(1); + + String broker1Uri; + String broker2Uri; + try { - { - brokerService1 = new BrokerService(); - brokerService1.setBrokerName("one"); - brokerService1.setUseJmx(false); - brokerService1 - .setPersistenceAdapter(new MemoryPersistenceAdapter()); - brokerService1.addConnector("nio://0.0.0.0:61616"); - final NetworkConnector network1 = brokerService1 - .addNetworkConnector("static:(tcp://localhost:51515)"); - network1.setName("network1"); - network1.setDynamicOnly(true); - network1.setNetworkTTL(3); - network1.setPrefetchSize(networkBridgePrefetch); - network1.setConduitSubscriptions(false); - network1.setDecreaseNetworkConsumerPriority(false); - network1.setDispatchAsync(false); - brokerService1.start(); - } - { - brokerService2 = new BrokerService(); - brokerService2.setBrokerName("two"); - brokerService2.setUseJmx(false); - brokerService2 - .setPersistenceAdapter(new MemoryPersistenceAdapter()); - brokerService2.addConnector("nio://0.0.0.0:51515"); - final NetworkConnector network2 = brokerService2 - .addNetworkConnector("static:(tcp://localhost:61616)"); - network2.setName("network1"); - network2.setDynamicOnly(true); - network2.setNetworkTTL(3); - network2.setPrefetchSize(networkBridgePrefetch); - network2.setConduitSubscriptions(false); - network2.setDecreaseNetworkConsumerPriority(false); - network2.setDispatchAsync(false); - brokerService2.start(); - } + + + brokerService1 = new BrokerService(); + brokerService1.setBrokerName("one"); + brokerService1.setUseJmx(false); + brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter()); + broker1Uri = brokerService1.addConnector("nio://0.0.0.0:0").getPublishableConnectString(); + + brokerService2 = new BrokerService(); + brokerService2.setBrokerName("two"); + brokerService2.setUseJmx(false); + brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter()); + broker2Uri = brokerService2.addConnector("nio://0.0.0.0:0").getPublishableConnectString(); + + final NetworkConnector network1 = brokerService1.addNetworkConnector("static:("+broker2Uri+")"); + network1.setName("network1"); + network1.setDynamicOnly(true); + network1.setNetworkTTL(3); + network1.setPrefetchSize(networkBridgePrefetch); + network1.setConduitSubscriptions(false); + network1.setDecreaseNetworkConsumerPriority(false); + network1.setDispatchAsync(false); + + final NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")"); + network2.setName("network1"); + network2.setDynamicOnly(true); + network2.setNetworkTTL(3); + network2.setPrefetchSize(networkBridgePrefetch); + network2.setConduitSubscriptions(false); + network2.setDecreaseNetworkConsumerPriority(false); + network2.setDispatchAsync(false); + + brokerService1.start(); + brokerService2.start(); + final ExecutorService pool = Executors.newSingleThreadExecutor(); - final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory( - "vm://one"); - final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory( - connectionFactory1); + final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory("vm://one"); + final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(connectionFactory1); singleConnectionFactory1.setReconnectOnException(true); final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer(); container1.setConnectionFactory(singleConnectionFactory1); @@ -110,6 +111,7 @@ public class LoadBalanceTest { container1.setDestination(new ActiveMQQueue("testingqueue")); container1.setMessageListener(new MessageListener() { + @Override public void onMessage(final Message message) { broker1Count.incrementAndGet(); } @@ -118,6 +120,7 @@ public class LoadBalanceTest { container1.start(); pool.submit(new Callable() { + @Override public Object call() throws Exception { try { final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory( @@ -133,6 +136,7 @@ public class LoadBalanceTest { "testingqueue")); container2.setMessageListener(new MessageListener() { + @Override public void onMessage(final Message message) { broker2Count.incrementAndGet(); } @@ -151,6 +155,7 @@ public class LoadBalanceTest { for (int i = 0; i < total; i++) { template.send(queue, new MessageCreator() { + @Override public Message createMessage( final Session session) throws JMSException { @@ -197,6 +202,7 @@ public class LoadBalanceTest { try { if (brokerService1 != null) { brokerService1.stop(); + brokerService1.waitUntilStopped(); } } catch (final Throwable t) { t.printStackTrace(); @@ -204,12 +210,13 @@ public class LoadBalanceTest { try { if (brokerService2 != null) { brokerService2.stop(); + brokerService2.waitUntilStopped(); } } catch (final Throwable t) { t.printStackTrace(); } } - + if (broker1Count.get() < 25 || broker2Count.get() < 25) { fail("Each broker should have gotten at least 25 messages but instead broker1 got " + broker1Count.get() @@ -240,6 +247,7 @@ public class LoadBalanceTest { container1.setDestination(new ActiveMQQueue(TESTING_QUEUE)); container1.setMessageListener(new MessageListener() { + @Override public void onMessage(final Message message) { broker1Count.incrementAndGet(); } @@ -248,6 +256,7 @@ public class LoadBalanceTest { container1.start(); pool.submit(new Callable() { + @Override public Object call() throws Exception { System.setProperty("lbt.brokerName", "two"); final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory( @@ -261,16 +270,17 @@ public class LoadBalanceTest { container2.setDestination(new ActiveMQQueue(TESTING_QUEUE)); container2.setMessageListener(new MessageListener() { + @Override public void onMessage(final Message message) { broker2Count.incrementAndGet(); } }); container2.afterPropertiesSet(); container2.start(); - - + + assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS)); - + final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory( singleConnectionFactory2); final JmsTemplate template = new JmsTemplate( @@ -279,6 +289,7 @@ public class LoadBalanceTest { for (int i = 0; i < total; i++) { template.send(queue, new MessageCreator() { + @Override public Message createMessage(final Session session) throws JMSException { final TextMessage message = session @@ -291,14 +302,14 @@ public class LoadBalanceTest { return null; } }); - + // give network a chance to build, needs advisories waitForBridgeFormation(); startProducer.countDown(); - + pool.shutdown(); pool.awaitTermination(10, TimeUnit.SECONDS); - + LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get()); int count = 0;