diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java index ef33f9ac20..d6ce978cf1 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java @@ -16,6 +16,9 @@ */ package org.apache.activemq; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -24,27 +27,39 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + /** - * + * Test for client ACK support */ -public class JmsClientAckTest extends TestSupport { +public class JmsClientAckTest { + + @Rule + public TestName name = new TestName(); private Connection connection; - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); + @Before + public void setUp() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false&broker.useJmx=false"); + + connection = factory.createConnection();; } /** * @see junit.framework.TestCase#tearDown() */ - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { if (connection != null) { connection.close(); connection = null; } - super.tearDown(); } /** @@ -52,6 +67,7 @@ public class JmsClientAckTest extends TestSupport { * * @throws JMSException */ + @Test(timeout = 60000) public void testAckedMessageAreConsumed() throws JMSException { connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -71,7 +87,7 @@ public class JmsClientAckTest extends TestSupport { // Attempt to Consume the message... consumer = session.createConsumer(queue); - msg = consumer.receive(1000); + msg = consumer.receive(500); assertNull(msg); session.close(); @@ -82,6 +98,7 @@ public class JmsClientAckTest extends TestSupport { * * @throws JMSException */ + @Test(timeout = 60000) public void testLastMessageAcked() throws JMSException { connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -96,7 +113,7 @@ public class JmsClientAckTest extends TestSupport { Message msg = consumer.receive(1000); assertNotNull(msg); msg = consumer.receive(1000); - assertNotNull(msg); + assertNotNull(msg); msg = consumer.receive(1000); assertNotNull(msg); msg.acknowledge(); @@ -107,17 +124,18 @@ public class JmsClientAckTest extends TestSupport { // Attempt to Consume the message... consumer = session.createConsumer(queue); - msg = consumer.receive(1000); + msg = consumer.receive(500); assertNull(msg); session.close(); } - + /** * Tests if unacknowledged messages are being re-delivered when the consumer connects again. - * + * * @throws JMSException */ + @Test(timeout = 60000) public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException { connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -128,24 +146,23 @@ public class JmsClientAckTest extends TestSupport { // Consume the message... MessageConsumer consumer = session.createConsumer(queue); Message msg = consumer.receive(1000); - assertNotNull(msg); + assertNotNull(msg); // Don't ack the message. - + // Reset the session. This should cause the unacknowledged message to be re-delivered. session.close(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - + // Attempt to Consume the message... consumer = session.createConsumer(queue); msg = consumer.receive(2000); - assertNotNull(msg); + assertNotNull(msg); msg.acknowledge(); - + session.close(); } protected String getQueueName() { - return getClass().getName() + "." + getName(); + return getClass().getName() + "." + name.getMethodName(); } - } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java index 009221b53f..632294cd2b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java @@ -16,15 +16,13 @@ */ package org.apache.activemq; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.net.URI; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -34,11 +32,11 @@ import javax.jms.Topic; import javax.management.ObjectName; import org.apache.activemq.advisory.DestinationSource; -import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -46,13 +44,16 @@ import org.junit.Test; public class RemoveDestinationTest { private static final String VM_BROKER_URL = "vm://localhost?create=false"; - private static final String BROKER_URL = "broker:vm://localhost?broker.persistent=false&broker.useJmx=true"; BrokerService broker; @Before public void setUp() throws Exception { - broker = BrokerFactory.createBroker(new URI(BROKER_URL)); + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.getManagementContext().setCreateConnector(false); + broker.setSchedulerSupport(false); broker.start(); broker.waitUntilStarted(); } @@ -65,7 +66,7 @@ public class RemoveDestinationTest { } private Connection createConnection(final boolean start) throws JMSException { - ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); Connection conn = cf.createConnection(); if (start) { conn.start(); @@ -73,53 +74,94 @@ public class RemoveDestinationTest { return conn; } - @Test + @Test(timeout = 60000) public void testRemoveDestinationWithoutSubscriber() throws Exception { ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true); - DestinationSource destinationSource = amqConnection.getDestinationSource(); + + final DestinationSource destinationSource = amqConnection.getDestinationSource(); Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("TEST.FOO"); MessageProducer producer = session.createProducer(topic); + final int consumerCount = broker.getAdminView().getTopicSubscribers().length; MessageConsumer consumer = session.createConsumer(topic); TextMessage msg = session.createTextMessage("Hellow World"); producer.send(msg); assertNotNull(consumer.receive(5000)); - Thread.sleep(1000); + final ActiveMQTopic amqTopic = (ActiveMQTopic) topic; - ActiveMQTopic amqTopic = (ActiveMQTopic) topic; - assertTrue(destinationSource.getTopics().contains(amqTopic)); + assertTrue("Destination never discovered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationSource.getTopics().contains(amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); consumer.close(); producer.close(); session.close(); - Thread.sleep(3000); + assertTrue("Subscriber still active", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getTopicSubscribers().length == consumerCount; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); + amqConnection.destroyDestination((ActiveMQDestination) topic); - Thread.sleep(3000); - assertFalse(destinationSource.getTopics().contains(amqTopic)); + + assertTrue("Destination still active", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !destinationSource.getTopics().contains(amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); + + assertTrue("Destination never unregistered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !destinationPresentInAdminView(broker, amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); } - @Test + @Test(timeout = 60000) public void testRemoveDestinationWithSubscriber() throws Exception { ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true); - DestinationSource destinationSource = amqConnection.getDestinationSource(); + final DestinationSource destinationSource = amqConnection.getDestinationSource(); Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("TEST.FOO"); MessageProducer producer = session.createProducer(topic); + final int consumerCount = broker.getAdminView().getTopicSubscribers().length; MessageConsumer consumer = session.createConsumer(topic); TextMessage msg = session.createTextMessage("Hellow World"); producer.send(msg); assertNotNull(consumer.receive(5000)); - Thread.sleep(1000); - ActiveMQTopic amqTopic = (ActiveMQTopic) topic; + final ActiveMQTopic amqTopic = (ActiveMQTopic) topic; - assertTrue(destinationPresentInAdminView(broker, amqTopic)); - assertTrue(destinationSource.getTopics().contains(amqTopic)); + assertTrue("Destination never registered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationPresentInAdminView(broker, amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); + + assertTrue("Destination never discovered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationSource.getTopics().contains(amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); // This line generates a broker error since the consumer is still active. try { @@ -129,23 +171,53 @@ public class RemoveDestinationTest { assertTrue(expected.getMessage().indexOf(amqTopic.getTopicName()) != -1); } - Thread.sleep(3000); + assertTrue("Destination never registered", Wait.waitFor(new Wait.Condition() { - assertTrue(destinationSource.getTopics().contains(amqTopic)); - assertTrue(destinationPresentInAdminView(broker, amqTopic)); + @Override + public boolean isSatisified() throws Exception { + return destinationPresentInAdminView(broker, amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); + + assertTrue("Destination never discovered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationSource.getTopics().contains(amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); consumer.close(); producer.close(); session.close(); - Thread.sleep(3000); + assertTrue("Subscriber still active", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getTopicSubscribers().length == consumerCount; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); // The destination will not be removed with this call, but if you remove // the call above that generates the error it will. amqConnection.destroyDestination(amqTopic); - Thread.sleep(3000); - assertFalse(destinationSource.getTopics().contains(amqTopic)); - assertFalse(destinationPresentInAdminView(broker, amqTopic)); + + assertTrue("Destination still active", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !destinationSource.getTopics().contains(amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); + + assertTrue("Destination never unregistered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !destinationPresentInAdminView(broker, amqTopic); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); } private boolean destinationPresentInAdminView(BrokerService broker2, ActiveMQTopic amqTopic) throws Exception {