diff --git a/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java index 28dd7adcdb..c4d2d953a6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java @@ -27,8 +27,8 @@ import javax.jms.Destination; /** * A useful base class which creates and closes an embedded broker - * - * + * + * */ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { @@ -39,7 +39,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { protected boolean useTopic; protected ActiveMQDestination destination; protected JmsTemplate template; - + protected void setUp() throws Exception { if (broker == null) { broker = createBroker(); @@ -64,7 +64,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { /** * Factory method to create a new {@link JmsTemplate} - * + * * @return a newly created JmsTemplate */ protected JmsTemplate createJmsTemplate() { @@ -73,7 +73,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { /** * Factory method to create a new {@link Destination} - * + * * @return newly created Destinaiton */ protected ActiveMQDestination createDestination() { @@ -101,7 +101,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { /** * Factory method to create a new {@link ConnectionFactory} instance - * + * * @return a newly created connection factory */ protected ConnectionFactory createConnectionFactory() throws Exception { @@ -110,7 +110,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { /** * Factory method to create a new broker - * + * * @throws Exception */ protected BrokerService createBroker() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java index 900b1c0ab7..74040bf123 100644 --- a/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -61,9 +62,9 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { // Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription producer.send(session.createTextMessage("Msg3")); - + session.commit(); - + // wait for test to complete and the test result to get set // this happens asynchronously since the messages are delivered asynchronously synchronized (testMutex) { @@ -71,13 +72,18 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { testMutex.wait(); } } - + //test completed, result is ready assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful); } + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + protected void setUp() throws Exception { - bindAddress = "tcp://localhost:61616"; + bindAddress = "tcp://localhost:0"; super.setUp(); testMutex = new TestMutex(); @@ -105,7 +111,7 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { answer.setDestinationPolicy(policyMap); return answer; } - + protected Queue createQueue() { return new ActiveMQQueue(getDestinationString()); } diff --git a/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java b/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java index a9352396c7..516a159449 100644 --- a/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java @@ -17,18 +17,14 @@ package org.apache.activemq; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import javax.jms.Session; -import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - * - */ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(ReconnectWithSameClientIDTest.class); @@ -59,8 +55,13 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport { useConnection(connection); } + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + protected void setUp() throws Exception { - bindAddress = "tcp://localhost:61616"; + bindAddress = "tcp://localhost:0"; super.setUp(); } @@ -75,9 +76,5 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport { protected void useConnection(Connection connection) throws JMSException { connection.setClientID("foo"); connection.start(); - /** - * Session session = connection.createSession(transacted, authMode); - * return session; - */ } } diff --git a/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java b/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java index 7bef61c566..655bb40189 100644 --- a/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java @@ -29,19 +29,15 @@ import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { - + private static final transient Logger LOG = LoggerFactory.getLogger(MasterSlaveTempQueueMemoryTest.class); - - String masterBindAddress = "tcp://localhost:61616"; - String slaveBindAddress = "tcp://localhost:62616"; + BrokerService slave; /* @@ -51,17 +47,16 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { @Override protected BrokerService createBroker() throws Exception { // bindAddress is used by super.createBroker - bindAddress = masterBindAddress; + bindAddress = "tcp://localhost:0"; BrokerService master = super.createBroker(); master.setBrokerName("master"); configureBroker(master); - bindAddress = slaveBindAddress; slave = super.createBroker(); slave.setBrokerName("slave"); - slave.setMasterConnectorURI(masterBindAddress); - + slave.setMasterConnectorURI(master.getTransportConnectors().get(0).getPublishableConnectString()); + configureBroker(slave); - bindAddress = masterBindAddress; + bindAddress = master.getTransportConnectors().get(0).getPublishableConnectString(); return master; } @@ -74,15 +69,15 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { // optimized dispatch does not effect the determinism of inflight between // master and slave in this test //broker.setDestinationPolicy(policyMap); - + } @Override protected void startBroker() throws Exception { - - // because master will wait for slave to connect it needs + + // because master will wait for slave to connect it needs // to be in a separate thread - Thread starterThread = new Thread() { + Thread starterThread = new Thread() { public void run() { try { broker.setWaitForSlave(true); @@ -94,7 +89,7 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { } }; starterThread.start(); - + slave.start(); starterThread.join(60*1000); assertTrue("slave is indeed a slave", slave.isSlave()); @@ -104,7 +99,7 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { protected void tearDown() throws Exception { slave.stop(); super.tearDown(); - + } @Override @@ -112,25 +107,25 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { super.testLoadRequestReply(); Thread.sleep(2000); - + // some checks on the slave AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor( AdvisoryBroker.class); - + assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size()); - + RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor( - RegionBroker.class); - - //serverDestination + - assertEquals(6, rb.getDestinationMap().size()); - + RegionBroker.class); + + //serverDestination + + assertEquals(6, rb.getDestinationMap().size()); + RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor( RegionBroker.class); LOG.info("enqueues " + rb.getDestinationStatistics().getEnqueues().getCount()); assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount()); - + LOG.info("dequeues " + rb.getDestinationStatistics().getDequeues().getCount()); assertEquals("dequeues match", rb.getDestinationStatistics().getDequeues().getCount(), @@ -145,24 +140,24 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { // slave does not actually dispatch any messages, so no request/reply(2) pair per iteration(COUNT) // slave estimate must be >= actual master value // master does not always reach expected total, should be assertEquals.., why? - assertTrue("dispatched to slave is as good as master, master=" + assertTrue("dispatched to slave is as good as master, master=" + masterRb.getDestinationStatistics().getDispatched().getCount(), - rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >= + rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >= masterRb.getDestinationStatistics().getDispatched().getCount()); } - + public void testMoreThanPageSizeUnacked() throws Exception { - + final int messageCount = Queue.MAX_PAGE_SIZE + 10; final CountDownLatch latch = new CountDownLatch(1); - + serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQSession s = (ActiveMQSession) serverSession; s.setSessionAsyncDispatch(true); - + MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination); serverConsumer.setMessageListener(new MessageListener() { - + public void onMessage(Message msg) { try { latch.await(30L, TimeUnit.SECONDS); @@ -171,41 +166,41 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { } } }); - + MessageProducer producer = clientSession.createProducer(serverDestination); for (int i =0; i< messageCount; i++) { Message msg = clientSession.createMessage(); producer.send(msg); } Thread.sleep(5000); - + RegionBroker slaveRb = (RegionBroker) slave.getBroker().getAdaptor( RegionBroker.class); RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor( RegionBroker.class); - - assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount()); + + assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount()); assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount()); - + latch.countDown(); Thread.sleep(5000); - assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount()); + assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount()); assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount()); } - + public void testLoadRequestReplyWithNoTempQueueDelete() throws Exception { deleteTempQueue = false; messagesToSend = 10; testLoadRequestReply(); } - + public void testLoadRequestReplyWithTransactions() throws Exception { serverTransactional = clientTransactional = true; messagesToSend = 100; reInitialiseSessions(); testLoadRequestReply(); } - + public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception { serverTransactional = true; numConsumers = numProducers = 10; @@ -215,10 +210,10 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { } protected void reInitialiseSessions() throws Exception { - // reinitialize so they can respect the transactional flags + // reinitialize so they can respect the transactional flags serverSession.close(); clientSession.close(); - serverSession = serverConnection.createSession(serverTransactional, + serverSession = serverConnection.createSession(serverTransactional, serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); clientSession = clientConnection.createSession(clientTransactional, clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 7fc6a08f7d..eaf303d7a9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -25,6 +25,7 @@ import java.util.Map; import javax.jms.BytesMessage; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -48,7 +49,6 @@ import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; -import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; @@ -90,8 +90,9 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { public void testConnectors() throws Exception{ ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); - assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort()); - + assertEquals("openwire URL port doesn't equal bind Address", + new URI(broker.getOpenWireURL()).getPort(), + new URI(this.broker.getTransportConnectors().get(0).getPublishableConnectString()).getPort()); } public void testMBeans() throws Exception { @@ -317,10 +318,9 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { String newDestination = getSecondDestinationString(); long queueSize = queue.getQueueSize(); + assertTrue(queueSize > 0); queue.copyMatchingMessagesTo("counter > 2", newDestination); - - queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); @@ -334,6 +334,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); } + @SuppressWarnings("rawtypes") protected void assertSendViaMBean() throws Exception { String queueName = getDestinationString() + ".SendMBBean"; @@ -353,7 +354,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { for (int i = 0; i < count; i++) { String body = "message:" + i; - Map headers = new HashMap(); + Map headers = new HashMap(); headers.put("JMSCorrelationID", "MyCorrId"); headers.put("JMSDeliveryMode", Boolean.FALSE); headers.put("JMSXGroupID", "MyGroupID"); @@ -370,7 +371,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { if (compdatalist.length == 0) { fail("There is no message in the queue:"); } - String[] messageIDs = new String[compdatalist.length]; for (int i = 0; i < compdatalist.length; i++) { CompositeData cdata = compdatalist[i]; @@ -407,7 +407,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertComplexData(i, cdata, "JMSXGroupSeq", 1234); assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID"); assertComplexData(i, cdata, "Text", "message:" + i); - } } @@ -632,7 +631,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { } protected void setUp() throws Exception { - bindAddress = "tcp://localhost:61616"; + bindAddress = "tcp://localhost:0"; useTopic = false; super.setUp(); mbeanServer = broker.getManagementContext().getMBeanServer(); @@ -656,6 +655,11 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { super.tearDown(); } + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); answer.setPersistent(false); @@ -691,7 +695,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { Thread.sleep(1000); } - protected void useConnectionWithBlobMessage(Connection connection) throws Exception { connection.setClientID(clientID); connection.start(); @@ -733,7 +736,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { LOG.info(text); } - protected String getSecondDestinationString() { return "test.new.destination." + getClass() + "." + getName(); } @@ -815,7 +817,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { } catch (Exception e) { // expected! } - } // Test for AMQ-3029 diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java index b5ec91543d..c3ebf98731 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java @@ -25,6 +25,7 @@ import javax.management.ObjectName; import junit.framework.Test; import junit.textui.TestRunner; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.PersistenceAdapter; @@ -36,8 +37,6 @@ import org.slf4j.LoggerFactory; /** * A specific test of Queue.purge() functionality - * - * */ public class PurgeTest extends EmbeddedBrokerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(PurgeTest.class); @@ -118,7 +117,6 @@ public class PurgeTest extends EmbeddedBrokerTestSupport { Message message = session.createTextMessage("Test Message"); producer.send(message); - MessageConsumer consumer = session.createConsumer(destination); Message received = consumer.receive(1000); @@ -128,16 +126,12 @@ public class PurgeTest extends EmbeddedBrokerTestSupport { BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true); brokerProxy.removeQueue(getDestinationString()); - - producer.send(message); received = consumer.receive(1000); assertNotNull("Message not received", received); assertEquals(message, received); - - } public void testDelete() throws Exception { @@ -209,7 +203,7 @@ public class PurgeTest extends EmbeddedBrokerTestSupport { } protected void setUp() throws Exception { - bindAddress = "tcp://localhost:61616"; + bindAddress = "tcp://localhost:0"; useTopic = false; super.setUp(); mbeanServer = broker.getManagementContext().getMBeanServer(); @@ -233,6 +227,11 @@ public class PurgeTest extends EmbeddedBrokerTestSupport { return answer; } + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + protected void echo(String text) { LOG.info(text); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java index 0ea4dbabc1..3b02cdd22a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java @@ -29,8 +29,8 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.spring.ConsumerBean; /** - * - * + * + * */ public class AMQ1687Test extends EmbeddedBrokerTestSupport { @@ -40,10 +40,8 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport { protected ConnectionFactory createConnectionFactory() throws Exception { //prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log. return new ActiveMQConnectionFactory(this.bindAddress+"?jms.prefetchPolicy.all=5"); - //return super.createConnectionFactory(); - //return new ActiveMQConnectionFactory("tcp://localhost:61616"); } - + public void testVirtualTopicCreation() throws Exception { if (connection == null) { connection = createConnection(); @@ -52,10 +50,10 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport { ConsumerBean messageList = new ConsumerBean(); messageList.setVerbose(true); - + String queueAName = getVirtualTopicConsumerName(); String queueBName = getVirtualTopicConsumerNameB(); - + // create consumer 'cluster' ActiveMQQueue queue1 = new ActiveMQQueue(queueAName); ActiveMQQueue queue2 = new ActiveMQQueue(queueBName); @@ -76,16 +74,14 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport { for (int i = 0; i < total; i++) { producer.send(session.createTextMessage("message: " + i)); } - + messageList.assertMessagesArrived(total*2); } - protected String getVirtualTopicName() { return "VirtualTopic.TEST"; } - protected String getVirtualTopicConsumerName() { return "Consumer.A.VirtualTopic.TEST"; } @@ -93,8 +89,7 @@ public class AMQ1687Test extends EmbeddedBrokerTestSupport { protected String getVirtualTopicConsumerNameB() { return "Consumer.B.VirtualTopic.TEST"; } - - + protected void setUp() throws Exception { this.bindAddress="tcp://localhost:61616"; super.setUp(); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java index b0e6ddf4cd..a850a3fc62 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java @@ -40,20 +40,20 @@ import org.slf4j.LoggerFactory; /** * This is a test case for the issue reported at: * https://issues.apache.org/activemq/browse/AMQ-1866 - * - * If you have a JMS producer sending messages to multiple fast consumers and - * one slow consumer, eventually all consumers will run as slow as - * the slowest consumer. + * + * If you have a JMS producer sending messages to multiple fast consumers and + * one slow consumer, eventually all consumers will run as slow as + * the slowest consumer. */ public class AMQ1866 extends TestCase { private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class); private BrokerService brokerService; private ArrayList threads = new ArrayList(); - - String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616"; - String ACTIVEMQ_BROKER_URI = "tcp://localhost:61616"; - + + private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0"; + private String ACTIVEMQ_BROKER_URI; + AtomicBoolean shutdown = new AtomicBoolean(); private ActiveMQQueue destination; @@ -65,19 +65,21 @@ public class AMQ1866 extends TestCase { adaptor.setIndexBinSize(4096); brokerService.setPersistenceAdapter(adaptor); brokerService.deleteAllMessages(); - + // A small max page size makes this issue occur faster. PolicyMap policyMap = new PolicyMap(); PolicyEntry pe = new PolicyEntry(); pe.setMaxPageSize(1); policyMap.put(new ActiveMQQueue(">"), pe); brokerService.setDestinationPolicy(policyMap); - + brokerService.addConnector(ACTIVEMQ_BROKER_BIND); brokerService.start(); + + ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); destination = new ActiveMQQueue(getName()); } - + @Override protected void tearDown() throws Exception { // Stop any running threads. @@ -85,30 +87,29 @@ public class AMQ1866 extends TestCase { for (Thread t : threads) { t.interrupt(); t.join(); - } + } brokerService.stop(); } public void testConsumerSlowDownPrefetch0() throws Exception { - ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0"; + ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0"; doTestConsumerSlowDown(); } public void testConsumerSlowDownPrefetch10() throws Exception { - ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10"; + ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10"; doTestConsumerSlowDown(); } - + public void testConsumerSlowDownDefaultPrefetch() throws Exception { - ACTIVEMQ_BROKER_URI = "tcp://localhost:61616"; doTestConsumerSlowDown(); } public void doTestConsumerSlowDown() throws Exception { - + // Preload the queue. produce(20000); - + Thread producer = new Thread() { @Override public void run() { @@ -122,7 +123,7 @@ public class AMQ1866 extends TestCase { }; threads.add(producer); producer.start(); - + // This is the slow consumer. ConsumerThread c1 = new ConsumerThread("Consumer-1"); threads.add(c1); @@ -139,33 +140,33 @@ public class AMQ1866 extends TestCase { Thread.sleep(1000); long c1Counter = c1.counter.getAndSet(0); long c2Counter = c2.counter.getAndSet(0); - System.out.println("c1: "+c1Counter+", c2: "+c2Counter); + log.debug("c1: "+c1Counter+", c2: "+c2Counter); totalReceived += c1Counter; totalReceived += c2Counter; - + // Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec if( i > 10 ) { assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0); } } - } - + } + public void produce(int count) throws Exception { Connection connection=null; try { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); factory.setDispatchAsync(true); - + connection = factory.createConnection(); - + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); connection.start(); - + for( int i=0 ; i< count; i++ ) { producer.send(session.createTextMessage(getName()+" Message "+(++i))); } - + } finally { try { connection.close(); @@ -173,7 +174,7 @@ public class AMQ1866 extends TestCase { } } } - + public class ConsumerThread extends Thread { final AtomicLong counter = new AtomicLong(); @@ -185,16 +186,16 @@ public class AMQ1866 extends TestCase { Connection connection=null; try { log.debug(getName() + ": is running"); - + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); factory.setDispatchAsync(true); - + connection = factory.createConnection(); - + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); connection.start(); - + while (!shutdown.get()) { TextMessage msg = (TextMessage)consumer.receive(1000); if ( msg!=null ) { @@ -202,13 +203,13 @@ public class AMQ1866 extends TestCase { if (getName().equals("Consumer-1")) { sleepingTime = 1000 * 1000; } else { - sleepingTime = 1; + sleepingTime = 1; } counter.incrementAndGet(); Thread.sleep(sleepingTime); } } - + } catch (Exception e) { } finally { log.debug(getName() + ": is stopping"); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java index 8eb80590a0..145916449a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java @@ -43,21 +43,22 @@ public class AMQ1917Test extends TestCase { private static final int NUM_MESSAGES = 4000; private static final int NUM_THREADS = 10; - public static final String REQUEST_QUEUE = "mock.in.queue"; - public static final String REPLY_QUEUE = "mock.out.queue"; + private static final String REQUEST_QUEUE = "mock.in.queue"; + private static final String REPLY_QUEUE = "mock.out.queue"; - Destination requestDestination = ActiveMQDestination.createDestination( + private Destination requestDestination = ActiveMQDestination.createDestination( REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE); - Destination replyDestination = ActiveMQDestination.createDestination( + private Destination replyDestination = ActiveMQDestination.createDestination( REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE); - CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES); - CountDownLatch errorLatch = new CountDownLatch(1); - ThreadPoolExecutor tpe; - final String BROKER_URL = "tcp://localhost:61616"; - BrokerService broker = null; + private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES); + private CountDownLatch errorLatch = new CountDownLatch(1); + private ThreadPoolExecutor tpe; + private final String BROKER_URL = "tcp://localhost:61616"; + private String connectionUri; + private BrokerService broker = null; private boolean working = true; - + // trival session/producer pool final Session[] sessions = new Session[NUM_THREADS]; final MessageProducer[] producers = new MessageProducer[NUM_THREADS]; @@ -67,11 +68,13 @@ public class AMQ1917Test extends TestCase { broker.setPersistent(false); broker.addConnector(BROKER_URL); broker.start(); - + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + BlockingQueue queue = new ArrayBlockingQueue(10000); tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000, TimeUnit.MILLISECONDS, queue); - ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory()); + ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory()); tpe.setThreadFactory(limitedthreadFactory); } @@ -79,29 +82,29 @@ public class AMQ1917Test extends TestCase { broker.stop(); tpe.shutdown(); } - - public void testLoadedSendRecieveWithCorrelationId() throws Exception { - + + public void testLoadedSendRecieveWithCorrelationId() throws Exception { + ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(); - connectionFactory.setBrokerURL(BROKER_URL); - Connection connection = connectionFactory.createConnection(); + connectionFactory.setBrokerURL(connectionUri); + Connection connection = connectionFactory.createConnection(); setupReceiver(connection); connection = connectionFactory.createConnection(); connection.start(); - - // trival session/producer pool + + // trival session/producer pool for (int i=0; i 0); - + connection.close(); - + broker.stop(); broker.waitUntilStopped(); - + createBroker(false); - factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + factory = new ActiveMQConnectionFactory(connectionUri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(session.createQueue("test")); @@ -85,20 +87,20 @@ public class AMQ2513Test extends TestCase { connection.start(); producer.send(session.createTextMessage("test123")); connection.close(); - + dv = createView(); assertTrue(dv.getQueueSize() > 0); - + broker.stop(); broker.waitUntilStopped(); - - } - - DestinationViewMBean createView() throws Exception { + + } + + DestinationViewMBean createView() throws Exception { String domain = "org.apache.activemq"; ObjectName name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test"); return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); } - + } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java index 89157b3054..4f6f16816e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Connection; -import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -39,17 +38,18 @@ public class AMQ2616Test extends TestCase { private static final int NUMBER = 2000; private BrokerService brokerService; private final ArrayList threads = new ArrayList(); - String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:61616"; - AtomicBoolean shutdown = new AtomicBoolean(); - + private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0"; + private final AtomicBoolean shutdown = new AtomicBoolean(); + + private String connectionUri; + public void testQueueResourcesReleased() throws Exception{ - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri); Connection tempConnection = fac.createConnection(); tempConnection.start(); Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue tempQueue = tempSession.createTemporaryQueue(); - final MessageConsumer tempConsumer = tempSession.createConsumer(tempQueue); - + Connection testConnection = fac.createConnection(); long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -67,8 +67,8 @@ public class AMQ2616Test extends TestCase { endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); assertEquals(startUsage,endUsage); } - - + + @Override protected void setUp() throws Exception { // Start an embedded broker up. @@ -95,6 +95,10 @@ public class AMQ2616Test extends TestCase { brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024); brokerService.addConnector(ACTIVEMQ_BROKER_BIND); brokerService.start(); + brokerService.waitUntilStarted(); + + connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + new ActiveMQQueue(getName()); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java index 855f286b69..f956da6d75 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.bugs; import javax.jms.Connection; import javax.jms.JMSException; -import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; @@ -26,13 +25,12 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.command.ActiveMQQueue; -/** - * - */ public class CraigsBugTest extends EmbeddedBrokerTestSupport { + private String connectionUri; + public void testConnectionFactory() throws Exception { - final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); + final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); final ActiveMQQueue queue = new ActiveMQQueue("testqueue"); final Connection conn = cf.createConnection(); @@ -60,8 +58,10 @@ public class CraigsBugTest extends EmbeddedBrokerTestSupport { } protected void setUp() throws Exception { - bindAddress = "tcp://localhost:61616"; + bindAddress = "tcp://localhost:0"; super.setUp(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java index 273dd7b8fe..066aba2034 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java @@ -41,14 +41,15 @@ public class DataFileNotDeletedTest extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(DataFileNotDeletedTest.class); private final CountDownLatch latch = new CountDownLatch(max_messages); - private static int max_messages = 600; + private final static int max_messages = 600; private static int messageCounter; private final String destinationName = getName()+"_Queue"; private BrokerService broker; private Connection receiverConnection; private Connection producerConnection; - final boolean useTopic = false; - + private final boolean useTopic = false; + private String connectionUri; + AMQPersistenceAdapter persistentAdapter; protected static final String payload = new String(new byte[512]); @@ -61,7 +62,7 @@ public class DataFileNotDeletedTest extends TestCase { producerConnection = createConnection(); producerConnection.start(); } - + @Override public void tearDown() throws Exception { receiverConnection.close(); @@ -72,16 +73,16 @@ public class DataFileNotDeletedTest extends TestCase { public void testForDataFileNotDeleted() throws Exception { doTestForDataFileNotDeleted(false); } - + public void testForDataFileNotDeletedTransacted() throws Exception { doTestForDataFileNotDeleted(true); } - + private void doTestForDataFileNotDeleted(boolean transacted) throws Exception { - + Receiver receiver = new Receiver() { public void receive(String s) throws Exception { - messageCounter++; + messageCounter++; latch.countDown(); } }; @@ -94,7 +95,7 @@ public class DataFileNotDeletedTest extends TestCase { latch.await(); assertEquals(max_messages, messageCounter); LOG.info("Sent and received + " + messageCounter + ", file count " + persistentAdapter.getAsyncDataManager().getFiles().size()); - waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 60000, 2); + waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 60000, 2); } private void waitFordataFilesToBeCleanedUp( @@ -111,7 +112,7 @@ public class DataFileNotDeletedTest extends TestCase { } private Connection createConnection() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); return factory.createConnection(); } @@ -120,7 +121,7 @@ public class DataFileNotDeletedTest extends TestCase { broker.setDeleteAllMessagesOnStartup(true); broker.setPersistent(true); broker.setUseJmx(true); - broker.addConnector("tcp://localhost:61616").setName("Default"); + broker.addConnector("tcp://localhost:0").setName("Default"); broker.setPersistenceFactory(new AMQPersistenceAdapterFactory()); AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory(); // ensure there are a bunch of data files but multiple entries in each @@ -129,10 +130,13 @@ public class DataFileNotDeletedTest extends TestCase { factory.setCheckpointInterval(500); factory.setCleanupInterval(500); factory.setSyncOnWrite(false); - + persistentAdapter = (AMQPersistenceAdapter) broker.getPersistenceAdapter(); broker.start(); LOG.info("Starting broker.."); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } private void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java index 671e7bf5c9..2858302425 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java @@ -16,10 +16,9 @@ */ package org.apache.activemq.bugs; - -import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; @@ -28,128 +27,132 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.transport.RequestTimedOutIOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class JmsTimeoutTest extends EmbeddedBrokerTestSupport { - static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class); - - private int messageSize=1024*64; - private int messageCount=10000; - private final AtomicInteger exceptionCount = new AtomicInteger(0); - - /** - * Test the case where the broker is blocked due to a memory limit - * and a producer timeout is set on the connection. - * @throws Exception - */ - public void testBlockedProducerConnectionTimeout() throws Exception { - final ActiveMQConnection cx = (ActiveMQConnection)createConnection(); - final ActiveMQDestination queue = createDestination("testqueue"); - - // we should not take longer than 10 seconds to return from send - cx.setSendTimeout(10000); - - Runnable r = new Runnable() { - public void run() { - try { - LOG.info("Sender thread starting"); - Session session = cx.createSession(false, 1); - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - TextMessage message = session.createTextMessage(createMessageText()); - for(int count=0; count 0); - } + private final int messageSize=1024*64; + private final int messageCount=10000; + private final AtomicInteger exceptionCount = new AtomicInteger(0); + /** + * Test the case where the broker is blocked due to a memory limit + * and a producer timeout is set on the connection. + * @throws Exception + */ + public void testBlockedProducerConnectionTimeout() throws Exception { + final ActiveMQConnection cx = (ActiveMQConnection)createConnection(); + final ActiveMQDestination queue = createDestination("testqueue"); - /** - * Test the case where the broker is blocked due to a memory limit - * with a fail timeout - * @throws Exception - */ - public void testBlockedProducerUsageSendFailTimeout() throws Exception { - final ActiveMQConnection cx = (ActiveMQConnection)createConnection(); - final ActiveMQDestination queue = createDestination("testqueue"); + // we should not take longer than 10 seconds to return from send + cx.setSendTimeout(10000); - broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000); - Runnable r = new Runnable() { - public void run() { - try { - LOG.info("Sender thread starting"); - Session session = cx.createSession(false, 1); - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); + Runnable r = new Runnable() { + public void run() { + try { + LOG.info("Sender thread starting"); + Session session = cx.createSession(false, 1); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); - TextMessage message = session.createTextMessage(createMessageText()); - for(int count=0; count 0); - } + } + }; + cx.start(); + Thread producerThread = new Thread(r); + producerThread.start(); + producerThread.join(30000); + cx.close(); + // We should have a few timeout exceptions as memory store will fill up + assertTrue("No exception from the broker", exceptionCount.get() > 0); + } - protected void setUp() throws Exception { - exceptionCount.set(0); - bindAddress = "tcp://localhost:61616"; - broker = createBroker(); - broker.setDeleteAllMessagesOnStartup(true); - broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024); + /** + * Test the case where the broker is blocked due to a memory limit + * with a fail timeout + * @throws Exception + */ + public void testBlockedProducerUsageSendFailTimeout() throws Exception { + final ActiveMQConnection cx = (ActiveMQConnection)createConnection(); + final ActiveMQDestination queue = createDestination("testqueue"); - super.setUp(); - } + broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000); + Runnable r = new Runnable() { + public void run() { + try { + LOG.info("Sender thread starting"); + Session session = cx.createSession(false, 1); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); - private String createMessageText() { - StringBuffer buffer = new StringBuffer(); - buffer.append(""); - for (int i = buffer.length(); i < messageSize; i++) { - buffer.append('X'); - } - buffer.append(""); - return buffer.toString(); - } - - } + TextMessage message = session.createTextMessage(createMessageText()); + for(int count=0; count 0); + } + + protected void setUp() throws Exception { + exceptionCount.set(0); + bindAddress = "tcp://localhost:0"; + broker = createBroker(); + broker.setDeleteAllMessagesOnStartup(true); + broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024); + + super.setUp(); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + private String createMessageText() { + StringBuffer buffer = new StringBuffer(); + buffer.append(""); + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append('X'); + } + buffer.append(""); + return buffer.toString(); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java index e3288098a9..c8d946be9b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java @@ -38,101 +38,101 @@ import javax.management.ObjectName; /** * Test to determine if expired messages are being reaped if there is - * no active consumer connected to the broker. - * - * @author bsnyder - * + * no active consumer connected to the broker. */ public class MessageExpirationReaperTest { - - protected BrokerService broker; - protected ConnectionFactory factory; - protected ActiveMQConnection connection; - protected String destinationName = "TEST.Q"; - protected String brokerUrl = "tcp://localhost:61616"; - protected String brokerName = "testBroker"; - + + private BrokerService broker; + private ConnectionFactory factory; + private ActiveMQConnection connection; + private final String destinationName = "TEST.Q"; + private final String brokerUrl = "tcp://localhost:0"; + private final String brokerName = "testBroker"; + private String connectionUri; + @Before public void init() throws Exception { createBroker(); - + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + factory = createConnectionFactory(); connection = (ActiveMQConnection) factory.createConnection(); connection.start(); } - + @After public void cleanUp() throws Exception { connection.close(); broker.stop(); } - + protected void createBroker() throws Exception { broker = new BrokerService(); broker.setDeleteAllMessagesOnStartup(true); broker.setBrokerName(brokerName); broker.addConnector(brokerUrl); - + PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); defaultEntry.setExpireMessagesPeriod(500); policyMap.setDefaultEntry(defaultEntry); broker.setDestinationPolicy(policyMap); - + broker.start(); } - + protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(brokerUrl); + return new ActiveMQConnectionFactory(connectionUri); } - + protected Session createSession() throws Exception { - return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - + @Test public void testExpiredMessageReaping() throws Exception { - + Session producerSession = createSession(); ActiveMQDestination destination = (ActiveMQDestination) producerSession.createQueue(destinationName); MessageProducer producer = producerSession.createProducer(destination); producer.setTimeToLive(1000); - + final int count = 3; - // Send some messages with an expiration + // Send some messages with an expiration for (int i = 0; i < count; i++) { TextMessage message = producerSession.createTextMessage("" + i); producer.send(message); } - - // Let the messages expire + + // Let the messages expire Thread.sleep(2000); - + DestinationViewMBean view = createView(destination); - + assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount()); assertEquals("Incorrect queue size count", 0, view.getQueueSize()); - assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount()); - - // Send more messages with an expiration + assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount()); + + // Send more messages with an expiration for (int i = 0; i < count; i++) { TextMessage message = producerSession.createTextMessage("" + i); producer.send(message); } - - // Let the messages expire + + // Let the messages expire Thread.sleep(2000); - - // Simply browse the queue + + // Simply browse the queue Session browserSession = createSession(); QueueBrowser browser = browserSession.createBrowser((Queue) destination); - assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements()); - - // The messages expire and should be reaped because of the presence of - // the queue browser + assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements()); + + // The messages expire and should be reaped because of the presence of + // the queue browser assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount()); } - + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { String domain = "org.apache.activemq"; ObjectName name; diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java b/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java index 413ffe00cf..34e3866749 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java @@ -35,44 +35,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class OutOfOrderTestCase extends TestCase { - - private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class); - - public static final String BROKER_URL = "tcp://localhost:61616"; - private static final int PREFETCH = 10; - private static final String CONNECTION_URL = BROKER_URL + "?jms.prefetchPolicy.all=" + PREFETCH; - public static final String QUEUE_NAME = "QUEUE"; - private static final String DESTINATION = "QUEUE?consumer.exclusive=true"; - - BrokerService brokerService; - Session session; - Connection connection; - - int seq = 0; - - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setUseJmx(true); - brokerService.addConnector(BROKER_URL); - brokerService.deleteAllMessages(); - brokerService.start(); - - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL); - connection = connectionFactory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - } - + private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class); - protected void tearDown() throws Exception { - session.close(); - connection.close(); - brokerService.stop(); - } + private static final String BROKER_URL = "tcp://localhost:0"; + private static final int PREFETCH = 10; + private static final String CONNECTION_URL_OPTIONS = "?jms.prefetchPolicy.all=" + PREFETCH; + private static final String DESTINATION = "QUEUE?consumer.exclusive=true"; + private BrokerService brokerService; + private Session session; + private Connection connection; + private String connectionUri; + + private int seq = 0; + + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(true); + brokerService.addConnector(BROKER_URL); + brokerService.deleteAllMessages(); + brokerService.start(); + brokerService.waitUntilStarted(); + + connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + CONNECTION_URL_OPTIONS); + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + } + + protected void tearDown() throws Exception { + session.close(); + connection.close(); + brokerService.stop(); + } public void testOrder() throws Exception { @@ -102,7 +101,7 @@ public class OutOfOrderTestCase extends TestCase { log.info("Consuming messages 20-29 . . ."); consumeBatch(); } - + protected void consumeBatch() throws Exception { Destination destination = session.createQueue(DESTINATION); final MessageConsumer messageConsumer = session.createConsumer(destination); @@ -118,15 +117,15 @@ public class OutOfOrderTestCase extends TestCase { } } - private String toString(final Message message) throws JMSException { - String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID(); - if (message.getJMSRedelivered()) - ret += " (redelivered)"; - return ret; - - } + private String toString(final Message message) throws JMSException { + String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID(); + if (message.getJMSRedelivered()) + ret += " (redelivered)"; + return ret; - private static String createMessageText(final int index) { - return "message #" + index; - } + } + + private static String createMessageText(final int index) { + return "message #" + index; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java index feba861492..80adaed328 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java @@ -41,21 +41,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Test case demonstrating situation where messages are not delivered to consumers. + * Test case demonstrating situation where messages are not delivered to + * consumers. */ -public class QueueWorkerPrefetchTest extends TestCase implements MessageListener -{ - private static final Logger LOG = LoggerFactory.getLogger(QueueWorkerPrefetchTest.class); +public class QueueWorkerPrefetchTest extends TestCase implements + MessageListener { + private static final Logger LOG = LoggerFactory + .getLogger(QueueWorkerPrefetchTest.class); private static final int BATCH_SIZE = 10; - private static final long WAIT_TIMEOUT = 1000*10; + private static final long WAIT_TIMEOUT = 1000 * 10; /** The connection URL. */ - private static final String CONNECTION_URL = "tcp://localhost:61616"; + private static final String BROKER_BIND_ADDRESS = "tcp://localhost:0"; - /** The queue prefetch size to use. A value greater than 1 seems to make things work. */ + /** + * The queue prefetch size to use. A value greater than 1 seems to make + * things work. + */ private static final int QUEUE_PREFETCH_SIZE = 1; - /** The number of workers to use. A single worker with a prefetch of 1 works. */ + /** + * The number of workers to use. A single worker with a prefetch of 1 works. + */ private static final int NUM_WORKERS = 2; /** Embedded JMS broker. */ @@ -68,32 +75,37 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener private MessageConsumer masterItemConsumer; /** The number of acks received by the master. */ - private AtomicLong acksReceived = new AtomicLong(0); + private final AtomicLong acksReceived = new AtomicLong(0); - private AtomicReference latch = new AtomicReference(); + private final AtomicReference latch = new AtomicReference(); + + private String connectionUri; /** Messages sent to the work-item queue. */ - private static class WorkMessage implements Serializable - { + private static class WorkMessage implements Serializable { + private static final long serialVersionUID = 1L; private final int id; + public WorkMessage(int id) { this.id = id; } + @Override public String toString() { - return "Work: "+id; + return "Work: " + id; } } /** - * The worker process. Consume messages from the work-item queue, possibly creating - * more messages to submit to the work-item queue. For each work item, send an ack - * to the master. + * The worker process. Consume messages from the work-item queue, possibly + * creating more messages to submit to the work-item queue. For each work + * item, send an ack to the master. */ - private static class Worker implements MessageListener - { - - /** Counter shared between workers to decided when new work-item messages are created. */ + private static class Worker implements MessageListener { + /** + * Counter shared between workers to decided when new work-item messages + * are created. + */ private static AtomicInteger counter = new AtomicInteger(0); /** Session to use. */ @@ -105,54 +117,48 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener /** Producer for sending new work items to the work-items queue. */ private MessageProducer workItemProducer; - public Worker(Session session) - throws JMSException - { + public Worker(Session session) throws JMSException { this.session = session; - masterItemProducer = session.createProducer(session.createQueue("master-item")); + masterItemProducer = session.createProducer(session + .createQueue("master-item")); Queue workItemQueue = session.createQueue("work-item"); workItemProducer = session.createProducer(workItemQueue); - MessageConsumer workItemConsumer = session.createConsumer(workItemQueue); + MessageConsumer workItemConsumer = session + .createConsumer(workItemQueue); workItemConsumer.setMessageListener(this); } - public void onMessage(javax.jms.Message message) - { - try - { - WorkMessage work = (WorkMessage)((ObjectMessage)message).getObject(); - + public void onMessage(javax.jms.Message message) { + try { + WorkMessage work = (WorkMessage) ((ObjectMessage) message) + .getObject(); + long c = counter.incrementAndGet(); - + // Don't create a new work item for every BATCH_SIZE message. */ - if (c % BATCH_SIZE != 0) - { + if (c % BATCH_SIZE != 0) { // Send new work item to work-item queue. - workItemProducer.send(session.createObjectMessage( - new WorkMessage(work.id+1))); + workItemProducer.send(session + .createObjectMessage(new WorkMessage(work.id + 1))); } // Send ack to master. masterItemProducer.send(session.createObjectMessage(work)); - } - catch (JMSException e) - { + } catch (JMSException e) { throw new IllegalStateException("Something has gone wrong", e); } } /** Close of JMS resources used by worker. */ - public void close() throws JMSException - { + public void close() throws JMSException { masterItemProducer.close(); workItemProducer.close(); session.close(); } } - /** Master message handler. Process ack messages. */ - public void onMessage(javax.jms.Message message) - { + /** Master message handler. Process ack messages. */ + public void onMessage(javax.jms.Message message) { long acks = acksReceived.incrementAndGet(); latch.get().countDown(); if (acks % 1 == 0) { @@ -160,68 +166,72 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener } } - protected void setUp() throws Exception - { + protected void setUp() throws Exception { // Create the message broker. super.setUp(); broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(true); - broker.addConnector(CONNECTION_URL); + broker.addConnector(BROKER_BIND_ADDRESS); broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } - protected void tearDown() throws Exception - { + protected void tearDown() throws Exception { // Shut down the message broker. broker.deleteAllMessages(); broker.stop(); super.tearDown(); } - public void testActiveMQ() - throws Exception - { + public void testActiveMQ() throws Exception { // Create the connection to the broker. - ActiveMQConnectionFactory connectionFactory = - new ActiveMQConnectionFactory(CONNECTION_URL); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE); connectionFactory.setPrefetchPolicy(prefetchPolicy); Connection connection = connectionFactory.createConnection(); connection.start(); - Session masterSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - workItemProducer = masterSession.createProducer(masterSession.createQueue("work-item")); - masterItemConsumer = masterSession.createConsumer(masterSession.createQueue("master-item")); + Session masterSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + workItemProducer = masterSession.createProducer(masterSession + .createQueue("work-item")); + masterItemConsumer = masterSession.createConsumer(masterSession + .createQueue("master-item")); masterItemConsumer.setMessageListener(this); // Create the workers. Worker[] workers = new Worker[NUM_WORKERS]; - for (int i = 0; i < NUM_WORKERS; i++) - { - workers[i] = new Worker(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); + for (int i = 0; i < NUM_WORKERS; i++) { + workers[i] = new Worker(connection.createSession(false, + Session.AUTO_ACKNOWLEDGE)); } - // Send a message to the work queue, and wait for the BATCH_SIZE acks from the workers. + // Send a message to the work queue, and wait for the BATCH_SIZE acks + // from the workers. acksReceived.set(0); latch.set(new CountDownLatch(BATCH_SIZE)); - workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1))); - + workItemProducer.send(masterSession + .createObjectMessage(new WorkMessage(1))); + if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { fail("First batch only received " + acksReceived + " messages"); } - LOG.info("First batch received"); + LOG.info("First batch received"); - // Send another message to the work queue, and wait for the next 1000 acks. It is + // Send another message to the work queue, and wait for the next 1000 acks. It is // at this point where the workers never get notified of this message, as they - // have a large pending queue. Creating a new worker at this point however will + // have a large pending queue. Creating a new worker at this point however will // receive this new message. acksReceived.set(0); latch.set(new CountDownLatch(BATCH_SIZE)); - workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1))); - + workItemProducer.send(masterSession + .createObjectMessage(new WorkMessage(1))); + if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { fail("Second batch only received " + acksReceived + " messages"); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java index cb5a9d080b..a2c117e23c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java @@ -41,8 +41,8 @@ public class SlowConsumerTest extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class); private static final int MESSAGES_COUNT = 10000; - protected int messageLogFrequency = 2500; - protected long messageReceiveTimeout = 10000L; + private final int messageLogFrequency = 2500; + private final long messageReceiveTimeout = 10000L; private Socket stompSocket; private ByteArrayOutputStream inputBuffer; @@ -58,9 +58,10 @@ public class SlowConsumerTest extends TestCase { broker.setUseJmx(true); broker.setDeleteAllMessagesOnStartup(true); - broker.addConnector("tcp://localhost:61616").setName("Default"); + broker.addConnector("tcp://localhost:0").setName("Default"); broker.start(); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); final Connection connection = factory.createConnection(); connection.start(); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java index c37c8c97b9..1baff9a709 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java @@ -43,8 +43,8 @@ import org.slf4j.LoggerFactory; public class TransactionNotStartedErrorTest extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class); - - private static int counter = 500; + + private static final int counter = 500; private static int hectorToHaloCtr; private static int xenaToHaloCtr; @@ -54,14 +54,13 @@ public class TransactionNotStartedErrorTest extends TestCase { private static int haloToXenaCtr; private static int haloToTroyCtr; - private String hectorToHalo = "hectorToHalo"; - private String xenaToHalo = "xenaToHalo"; - private String troyToHalo = "troyToHalo"; - - private String haloToHector = "haloToHector"; - private String haloToXena = "haloToXena"; - private String haloToTroy = "haloToTroy"; + private final String hectorToHalo = "hectorToHalo"; + private final String xenaToHalo = "xenaToHalo"; + private final String troyToHalo = "troyToHalo"; + private final String haloToHector = "haloToHector"; + private final String haloToXena = "haloToXena"; + private final String haloToTroy = "haloToTroy"; private BrokerService broker; @@ -72,8 +71,9 @@ public class TransactionNotStartedErrorTest extends TestCase { private final Object lock = new Object(); - public Connection createConnection() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + public Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); return factory.createConnection(); } @@ -86,7 +86,7 @@ public class TransactionNotStartedErrorTest extends TestCase { broker.setDeleteAllMessagesOnStartup(true); broker.setPersistent(true); broker.setUseJmx(true); - broker.addConnector("tcp://localhost:61616").setName("Default"); + broker.addConnector("tcp://localhost:0").setName("Default"); broker.start(); LOG.info("Starting broker.."); } @@ -234,12 +234,10 @@ public class TransactionNotStartedErrorTest extends TestCase { } public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception { - return new MessageSender(queueName, connection, true, false); } public Thread buildProducer(Connection connection, final String queueName) throws Exception { - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final MessageSender producer = new MessageSender(queueName, connection, false, false); Thread thread = new Thread() { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java index 963bfa78ae..2f5fb5f99b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java @@ -326,7 +326,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { protected BrokerService createBroker() throws Exception { BrokerService brokerService = new BrokerService(); brokerService.setEnableStatistics(false); - brokerService.addConnector("tcp://0.0.0.0:61616"); + brokerService.addConnector("tcp://0.0.0.0:0"); brokerService.setDeleteAllMessagesOnStartup(true); PolicyEntry policy = new PolicyEntry(); @@ -343,9 +343,9 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { policyMap.setDefaultEntry(policy); brokerService.setDestinationPolicy(policyMap); - if (false) { - // external mysql works a lot faster - // +// if (false) { +// // external mysql works a lot faster +// // // JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); // BasicDataSource ds = new BasicDataSource(); // com.mysql.jdbc.Driver d = new com.mysql.jdbc.Driver(); @@ -358,28 +358,29 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { // jdbc.setDataSource(ds); // brokerService.setPersistenceAdapter(jdbc); -/* add mysql bits to the pom in the testing dependencies - - mysql - mysql-connector-java - 5.1.10 - test - - - commons-dbcp - commons-dbcp - 1.2.2 - test - + /* add mysql bits to the pom in the testing dependencies + + mysql + mysql-connector-java + 5.1.10 + test + + + commons-dbcp + commons-dbcp + 1.2.2 + test + */ - } else { +// } else { setDefaultPersistenceAdapter(brokerService); - } +// } return brokerService; } protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); prefetchPolicy.setAll(1); factory.setPrefetchPolicy(prefetchPolicy); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java index 98b08616be..166049f71e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java @@ -1,4 +1,3 @@ - /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -16,6 +15,7 @@ * limitations under the License. */ package org.apache.activemq.usecases; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; @@ -32,10 +32,8 @@ import org.apache.activemq.command.ActiveMQQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * @author Rajani Chennamaneni - * */ public class DispatchMultipleConsumersTest extends TestCase { private final static Logger logger = LoggerFactory.getLogger(DispatchMultipleConsumersTest.class); @@ -50,10 +48,10 @@ public class DispatchMultipleConsumersTest extends TestCase { AtomicInteger consumedCount; CountDownLatch producerLatch; CountDownLatch consumerLatch; - String brokerURL = "tcp://localhost:61616"; + String brokerURL; String userName = ""; String password = ""; - + @Override protected void setUp() throws Exception { super.setUp(); @@ -61,42 +59,36 @@ public class DispatchMultipleConsumersTest extends TestCase { broker.setPersistent(true); broker.setUseJmx(true); broker.deleteAllMessages(); - broker.addConnector("tcp://localhost:61616"); + broker.addConnector("tcp://localhost:0"); broker.start(); + broker.waitUntilStarted(); dest = new ActiveMQQueue(destinationName); resetCounters(); + brokerURL = broker.getTransportConnectors().get(0).getPublishableConnectString(); } @Override protected void tearDown() throws Exception { -// broker.stop(); + broker.stop(); + broker.waitUntilStopped(); super.tearDown(); } - + private void resetCounters() { sentCount = new AtomicInteger(0); consumedCount = new AtomicInteger(0); producerLatch = new CountDownLatch(producerThreads); consumerLatch = new CountDownLatch(consumerCount); } - + public void testDispatch1() { for (int i = 1; i <= 5; i++) { resetCounters(); dispatch(); - /*try { - System.out.print("Press Enter to continue/finish:"); - //pause to check the counts on JConsole - System.in.read(); - System.in.read(); - } catch (IOException e) { - e.printStackTrace(); - }*/ - //check for consumed messages count assertEquals("Incorrect messages in Iteration " + i, sentCount.get(), consumedCount.get()); } } - + private void dispatch() { startConsumers(); startProducers(); @@ -130,15 +122,13 @@ public class DispatchMultipleConsumersTest extends TestCase { } private class ConsumerThread extends Thread { - Connection conn; Session session; MessageConsumer consumer; public ConsumerThread(Connection conn, String name) { super(); - this.conn = conn; this.setName(name); - logger.info("Created new consumer thread:" + name); + logger.trace("Created new consumer thread:" + name); try { session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(dest); @@ -170,24 +160,25 @@ public class DispatchMultipleConsumersTest extends TestCase { nullCount = 0; } Thread.sleep(100); - logger.info("Message received:" + msg.getJMSMessageID()); + if (logger.isTraceEnabled()) { + logger.trace("Message received:" + msg.getJMSMessageID()); + } msgCount++; } catch (JMSException e) { - logger.error("Failed to consume:", e); + logger.error("Failed to consume:", e); } catch (InterruptedException e) { - logger.error("Interrupted!", e); + logger.error("Interrupted!", e); } } try { consumer.close(); } catch (JMSException e) { - logger.error("Failed to close consumer " + getName(), e); + logger.error("Failed to close consumer " + getName(), e); } consumedCount.addAndGet(msgCount); consumerLatch.countDown(); - logger.info("Consumed " + msgCount + " messages using thread " + getName()); + logger.trace("Consumed " + msgCount + " messages using thread " + getName()); } - } private class ProducerThread extends Thread { @@ -195,12 +186,12 @@ public class DispatchMultipleConsumersTest extends TestCase { Connection conn; Session session; MessageProducer producer; - + public ProducerThread(ActiveMQConnectionFactory connFactory, int count, String name) { super(); this.count = count; this.setName(name); - logger.info("Created new producer thread:" + name); + logger.trace("Created new producer thread:" + name); try { conn = connFactory.createConnection(); conn.start(); @@ -224,12 +215,13 @@ public class DispatchMultipleConsumersTest extends TestCase { } catch (JMSException e) { logger.error(e.getMessage(), e); } catch (InterruptedException e) { - logger.error("Interrupted!", e); + logger.error("Interrupted!", e); } sentCount.addAndGet(i); producerLatch.countDown(); - logger.info("Sent " + i + " messages from thread " + getName()); + if (logger.isTraceEnabled()) { + logger.trace("Sent " + i + " messages from thread " + getName()); + } } } - } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java index 4e008de4a7..7d90c83549 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java @@ -48,7 +48,6 @@ public class MessageGroupCloseTest extends TestCase { private HashSet closedGroups2 = new HashSet(); // with the prefetch too high, this bug is not realized private static final String connStr = - //"tcp://localhost:61616"; "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1"; public void testNewConsumer() throws JMSException, InterruptedException {