From e1e8c5b083e0c9f6e6c6f4dd25d35ad039fc4ca1 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 11 Dec 2013 16:58:08 -0500 Subject: [PATCH] Some additional JMS Tests focused on Topics. Useful when updating the AMQP JMS Client version as it shows some new problem in the latest SNAPSHOT builds. --- .../transport/amqp/JMSClientTest.java | 182 ++++++++++++++++-- 1 file changed, 168 insertions(+), 14 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index e35127df50..ebed8d6367 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -23,7 +23,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -31,6 +33,7 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.QueueBrowser; import javax.jms.Session; @@ -38,8 +41,10 @@ import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; +import org.apache.activemq.util.Wait; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -347,26 +352,175 @@ public class JMSClientTest extends AmqpTestSupport { @Test(timeout=30000) public void testTTL() throws Exception { - QueueImpl queue = new QueueImpl("queue://" + name); + Connection connection = null; + try { + QueueImpl queue = new QueueImpl("queue://" + name); + connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + MessageProducer producer = session.createProducer(queue); + producer.setTimeToLive(1000); + Message toSend = session.createTextMessage("Sample text"); + producer.send(toSend); + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(5000); + assertNotNull(received); + LOG.info("Message JMSExpiration = {}", received.getJMSExpiration()); + producer.setTimeToLive(100); + producer.send(toSend); + TimeUnit.SECONDS.sleep(2); + received = consumer.receive(5000); + if (received != null) { + LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}", + new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(), + received.getJMSExpiration() - received.getJMSTimestamp()}); + } + assertNull(received); + } finally { + connection.close(); + } + } + + @Test(timeout=30000) + public void testDurableConsumerAsync() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + TopicImpl topic = new TopicImpl("topic://"+name); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference received = new AtomicReference(); + Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - connection.start(); - MessageProducer producer = session.createProducer(queue); - producer.setTimeToLive(1000); - Message toSend = session.createTextMessage("Sample text"); - producer.send(toSend); - MessageConsumer consumer = session.createConsumer(queue); - Message received = consumer.receive(5000); - assertNotNull(received); - producer.setTimeToLive(100); - producer.send(toSend); - TimeUnit.SECONDS.sleep(1); - assertNull(consumer.receive(5000)); + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + received.set(message); + latch.countDown(); + } + }); + + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertNotNull("Should have received a message by now.", received.get()); + assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage); + } + connection.close(); + } + + @Test(timeout=30000) + public void testDurableConsumerSync() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + TopicImpl topic = new TopicImpl("topic://"+name); + + Connection connection = createConnection(); + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + + final AtomicReference msg = new AtomicReference(); + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + msg.set(consumer.receiveNoWait()); + return msg.get() != null; + } + })); + + assertNotNull("Should have received a message by now.", msg.get()); + assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage); + } + connection.close(); + } + + @Test(timeout=30000) + public void testTopicConsumerAsync() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + TopicImpl topic = new TopicImpl("topic://"+name); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference received = new AtomicReference(); + + Connection connection = createConnection(); + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(topic); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + received.set(message); + latch.countDown(); + } + }); + + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertNotNull("Should have received a message by now.", received.get()); + assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage); + } + connection.close(); + } + + @Test(timeout=45000) + public void testTopicConsumerSync() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + TopicImpl topic = new TopicImpl("topic://"+name); + + Connection connection = createConnection(); + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(topic); + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + + final AtomicReference msg = new AtomicReference(); + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + msg.set(consumer.receiveNoWait()); + return msg.get() != null; + } + })); + + assertNotNull("Should have received a message by now.", msg.get()); + assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage); + } + connection.close(); } private Connection createConnection() throws JMSException { final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); final Connection connection = factory.createConnection(); + connection.setClientID(name.toString()); connection.setExceptionListener(new ExceptionListener() { @Override public void onException(JMSException exception) {